Redesign store interface

This commit is contained in:
Ehmry - 2017-12-20 20:54:30 -06:00
parent 3c83a65341
commit 536f9225f4
9 changed files with 500 additions and 334 deletions

View File

@ -21,6 +21,10 @@ Provided for illustrating canonicalized CBOR encoding.
Standard Lisp `cons` function, prepend to a list.
#### `(copy <cid> <from> <to>)`
Duplicate a directory entry.
#### `(define <symbol> <value>)`
Bind a value to a symbol. Returns value.

View File

@ -1,10 +1,16 @@
import asyncdispatch, asyncstreams, httpclient, json, base58.bitcoin, streams, nimSHA2, cbor, tables
import ipld, multiformats, ipldstore
import ipld, multiformats, ipldstore, unixfs
type
IpfsStore* = ref IpfsStoreObj
IpfsStoreObj = object of IpldStoreObj
## IPFS daemon client.
http: HttpClient
baseUrl: string
AsyncIpfsStore* = ref AsyncIpfsStoreObj
AsyncIpfsStoreObj = object of AsyncIpldStoreObj
## IPFS daemon client.
http: AsyncHttpClient
baseUrl: string
@ -13,9 +19,14 @@ proc ipfsClose(s: IpldStore) =
var ipfs = IpfsStore(s)
close ipfs.http
proc putBlockBase(ipfs: IpfsStore; data: string; format = "raw"): Future[tuple[key: string, size: int]] {.async.} =
proc ipfsAsyncClose(s: AsyncIpldStore) =
var ipfs = AsyncIpfsStore(s)
close ipfs.http
proc putBlock(ipfs: IpfsStore | AsyncIpfsStore; data: string; format = "raw"): Future[tuple[key: string, size: int]] {.multisync.} =
# stuff in some MIME horseshit so it works
ipfs.http.headers = newHttpHeaders({"Content-Type": "multipart/form-data; boundary=------------------------KILL_A_WEBDEV"})
ipfs.http.headers = newHttpHeaders({
"Content-Type": "multipart/form-data; boundary=------------------------KILL_A_WEBDEV"})
let
trash = """
@ -33,67 +44,89 @@ Content-Type: application/octet-stream
# You can tell its written in Go when the JSON keys had to be capitalized
result = (js["Key"].getStr, js["Size"].getNum.int)
proc ipfsPutRaw(s: IpldStore; blk: string): Future[Cid] {.async.} =
proc ipfsPut(s: IpldStore; blk: string): Cid =
var ipfs = IpfsStore(s)
let
cid = blk.CidSha256
resp = await ipfs.putBlockBase(blk, "raw")
isDag = blk.isUnixfs
tag = if isDag: MulticodecTag.DagCbor else: MulticodecTag.Raw
format = if isDag: "cbor" else: "raw"
cid = blk.CidSha256(tag)
resp = ipfs.putBlock(blk, format)
rCid = parseCid resp.key
if rCid != cid:
echo "IPFS CID mismatch"
raise newException(SystemError, "wanted " & cid.toHex & " got " & rCid.toHex)
if blk.len != resp.size:
echo "IPFS daemon returned a size mismatch, sent " & $blk.len & " got " & $resp.size
result = cid
proc ipfsAsyncPut(s: AsyncIpldStore; blk: string): Future[Cid] {.async.} =
var ipfs = AsyncIpfsStore(s)
let
isDag = blk.isUnixfs
tag = if isDag: MulticodecTag.DagCbor else: MulticodecTag.Raw
format = if isDag: "cbor" else: "raw"
cid = blk.CidSha256(tag)
resp = await ipfs.putBlock(blk, format)
rCid = parseCid resp.key
if rCid != cid:
raise newException(SystemError, "wanted " & cid.toHex & " got " & rCid.toHex)
if blk.len != resp.size:
raise newException(SystemError, "IPFS daemon returned a size mismatch")
echo "IPFS daemon returned a size mismatch, sent " & $blk.len & " got " & $resp.size
result = cid
proc ipfsPutDag(s: IpldStore; dag: Dag): Future[Cid] {.async.} =
proc ipfsGetBuffer(s: IpldStore; cid: Cid; buf: pointer; len: Natural): int =
var ipfs = IpfsStore(s)
let
blk = dag.toBinary
cid = blk.CidSha256(MulticodecTag.DagCbor)
resp = await ipfs.putBlockBase(blk, "cbor")
rCid = parseCid resp.key
if rCid != cid:
raise newException(SystemError, "wanted " & cid.toHex & " got " & rCid.toHex)
if blk.len != resp.size:
raise newException(SystemError, "IPFS daemon returned a size mismatch")
result = cid
let url = ipfs.baseUrl & "/api/v0/block/get?arg=" & $cid
try:
var body = ipfs.http.request(url).body
if not verify(cid, body):
raise newMissingObject cid
if body.len > len:
raise newException(BufferTooSmall, "")
result = body.len
copyMem(buf, body[0].addr, result)
except:
raise newMissingObject cid
proc ipfsGetRaw(s: IpldStore; cid: Cid): Future[string] {.async.} =
proc ipfsGet(s: IpldStore; cid: Cid; result: var string) =
var ipfs = IpfsStore(s)
let
url = ipfs.baseUrl & "/api/v0/block/get?arg=" & $cid
resp = await ipfs.http.request(url)
result = await resp.body
let url = ipfs.baseUrl & "/api/v0/block/get?arg=" & $cid
try:
result = ipfs.http.request(url).body
if not verify(cid, result):
raise newMissingObject cid
except:
raise newMissingObject cid
proc ipfsGetDag(s: IpldStore; cid: Cid): Future[Dag] {.async.} =
var ipfs = IpfsStore(s)
let
blk = await ipfs.ipfsGetRaw(cid)
result = parseDag blk
proc ipfsFileStreamRecurse(ipfs: IpfsStore; cid: Cid; fut: FutureStream[string]) {.async.} =
if cid.isRaw:
let chunk = await ipfs.ipfsGetRaw(cid)
await fut.write chunk
elif cid.isDagCbor:
let dag = await ipfs.getDag(cid)
for link in dag["links"].items:
let linkCid = parseCid link["cid"].getBytes
await ipfs.fileStream(linkCid, fut)
else: discard
proc ipfsFileStream(s: IpldStore; cid: Cid; fut: FutureStream[string]) {.async.} =
var ipfs = IpfsStore(s)
await ipfs.ipfsFileStreamRecurse(cid, fut)
complete fut
proc ipfsAsyncGet(s: AsyncIpldStore; cid: Cid): Future[string] {.async.} =
var ipfs = AsyncIpfsStore(s)
let url = ipfs.baseUrl & "/api/v0/block/get?arg=" & $cid
try:
let resp = await ipfs.http.request(url)
result = await resp.body
if not verify(cid, result):
raise newMissingObject cid
except:
raise newMissingObject cid
proc newIpfsStore*(url = "http://127.0.0.1:5001"): IpfsStore =
## Allocate a new synchronous store interface to the IPFS daemon at `url`.
## Every block retrieved by `get` is hashed and verified.
new result
result.closeImpl = ipfsClose
result.putRawImpl = ipfsPutRaw
result.getRawImpl = ipfsGetRaw
result.putDagImpl = ipfsPutDag
result.getDagImpl = ipfsGetDag
result.fileStreamImpl = ipfsFileStream
result.putImpl = ipfsPut
result.getBufferImpl = ipfsGetBuffer
result.getImpl = ipfsGet
result.http = newHttpClient()
result.baseUrl = url
proc newAsyncIpfsStore*(url = "http://127.0.0.1:5001"): AsyncIpfsStore =
## Allocate a new asynchronous store interface to the IPFS daemon at `url`.
## Every block retrieved by `get` is hashed and verified.
new result
result.closeImpl = ipfsAsyncClose
result.putImpl = ipfsAsyncPut
result.getImpl = ipfsAsyncGet
result.http = newAsyncHttpClient()
result.baseUrl = url

View File

@ -1,5 +1,7 @@
import nimSHA2, streams, multiformats, base58.bitcoin, cbor, hex, hashes
const MaxBlockSize* = 1 shl 18
type Cid* = object
digest*: string
hash*: MulticodecTag

View File

@ -9,4 +9,4 @@ license = "GPLv3"
requires "nim >= 0.17.3", "nimSHA2", "base58", "cbor >= 0.2.0"
bin = @["ipldrepl"]
bin = @["ipldrepl","ipldcat"]

33
ipldcat.nim Normal file
View File

@ -0,0 +1,33 @@
import streams, os, parseopt
import ipfsdaemon, ipldstore, ipld, unixfs
proc readFile(store: IpldStore; s: Stream; cid: Cid) =
var chunk = ""
let file = store.open(cid)
assert(not file.isNil)
assert(file.isFile)
if file.cid.isRaw:
store.get(file.cid, chunk)
s.write chunk
else:
var n = 0
for i in 0..file.links.high:
store.get(file.links[i].cid, chunk)
doAssert(n+chunk.len <= file.size)
s.write chunk
n.inc chunk.len
doAssert(n == file.size)
let
store = newIpfsStore("http://127.0.0.1:5001")
stream = stdout.newFileStream
for kind, key, value in getopt():
if kind == cmdArgument:
let cid = key.parseCid
if cid.isRaw:
let chunk = store.get(cid)
stream.write chunk
else:
readFile(store, stream, cid)

View File

@ -1,4 +1,4 @@
import rdstdin, nre, os, strutils, tables, asyncdispatch, asyncstreams, parseopt, streams, cbor
import nre, os, strutils, tables, parseopt, streams, cbor
import ipld, ipldstore, unixfs, multiformats
@ -11,8 +11,6 @@ type
AtomKind = enum
atomPath
atomCid
atomFile
atomDir
atomString
atomSymbol
atomError
@ -23,12 +21,6 @@ type
path: string
of atomCid:
cid: Cid
of atomFile:
fName: string
file: UnixfsNode
of atomDir:
dName:string
dir: UnixfsNode
of atomString:
str: string
of atomSymbol:
@ -36,7 +28,7 @@ type
of atomError:
err: string
Func = proc(env: Env; arg: NodeObj): Node
Func = proc(env: Env; arg: NodeObj): NodeRef
NodeKind = enum
nodeError
@ -44,9 +36,10 @@ type
nodeAtom
nodeFunc
Node = ref NodeObj
NodeRef = ref NodeObj
## NodeRef is used to chain nodes into lists.
NodeObj = object
## NodeObj is used to mutate nodes without side-effects.
case kind: NodeKind
of nodeList:
headRef, tailRef: NodeRef
@ -61,18 +54,13 @@ type
nextRef: NodeRef
EnvObj = object
store: FileStore
store: IpldStore
bindings: Table[string, NodeObj]
paths: Table[string, UnixfsNode]
cids: Table[Cid, UnixfsNode]
when not defined(release):
pathCacheHit: int
pathCacheMiss: int
cidCacheHit: int
cidCacheMiss: int
proc print(a: Atom; s: Stream)
proc print(ast: Node; s: Stream)
proc print(ast: NodeRef; s: Stream)
proc newAtom(c: Cid): Atom =
Atom(kind: atomCid, cid: c)
@ -90,28 +78,20 @@ proc newAtomPath(s: string): Atom =
proc newAtomString(s: string): Atom =
Atom(kind: atomString, str: s)
#[
template evalAssert(cond, n: Node, msg = "") =
if not cond:
let err = newException(EvalError, msg)
err.node = n
raise err
]#
proc newNodeError(msg: string; n: NodeObj): NodeRef =
var p = new Node
var p = new NodeRef
p[] = n
Node(kind: nodeError, errMsg: msg, errNode: p)
NodeRef(kind: nodeError, errMsg: msg, errNode: p)
proc newNode(a: Atom): Node =
Node(kind: nodeAtom, atom: a)
proc newNode(a: Atom): NodeRef =
NodeRef(kind: nodeAtom, atom: a)
proc newNodeList(): Node =
Node(kind: nodeList)
proc newNodeList(): NodeRef =
NodeRef(kind: nodeList)
proc next(n: NodeObj | NodeRef): NodeObj =
## Return a copy of list element that follows Node n.
assert(not n.nextRef.isNil)
assert(not n.nextRef.isNil, "next element is nil")
result = n.nextRef[]
proc head(list: NodeObj | NodeRef): NodeObj =
@ -120,7 +100,7 @@ proc head(list: NodeObj | NodeRef): NodeObj =
proc `next=`(n, p: NodeRef) =
## Return a copy of list element that follows Node n.
assert(n.nextRef.isNil)
assert(n.nextRef.isNil, "append to node that is not at the end of a list")
n.nextRef = p
iterator list(n: NodeObj): NodeObj =
@ -157,42 +137,25 @@ proc append(list: NodeRef; n: NodeObj) =
proc getFile(env: Env; path: string): UnixFsNode =
result = env.paths.getOrDefault path
if result.isNil:
result = waitFor env.store.addFile(path)
result = env.store.addFile(path)
assert(not result.isNil)
env.paths[path] = result
when not defined(release):
inc env.pathCacheMiss
else:
when not defined(release):
inc env.pathCacheHit
else: discard
proc getDir(env: Env; path: string): UnixFsNode =
result = env.paths.getOrDefault path
if result.isNil:
result = waitFor env.store.addDir(path)
result = env.store.addDir(path)
assert(not result.isNil)
env.paths[path] = result
when not defined(release):
inc env.pathCacheMiss
else:
when not defined(release):
inc env.pathCacheHit
else: discard
proc getUnixfs(env: Env; cid: Cid): UnixFsNode =
assert cid.isValid
result = env.cids.getOrDefault cid
if result.isNil:
let raw = waitFor env.store.get(cid)
var raw = ""
env.store.get(cid, raw)
result = parseUnixfs(raw, cid)
env.cids[cid] = result
when not defined(release):
inc env.cidCacheMiss
else:
when not defined(release):
inc env.cidCacheHit
else: discard
type
Tokens = seq[string]
@ -220,17 +183,6 @@ proc print(a: Atom; s: Stream) =
s.write a.path
of atomCid:
s.write $a.cid
of atomFile:
s.write $a.file.cid
s.write ':'
s.write a.fName
s.write ':'
s.write $a.file.size
of atomDir:
s.write "\n"
s.write $a.dir.cid
s.write ':'
s.write a.dName
of atomString:
s.write '"'
s.write a.str
@ -240,7 +192,7 @@ proc print(a: Atom; s: Stream) =
let fut = newFutureStream[string]()
asyncCheck env.store.fileStream(a.fileCid, fut)
while true:
let (valid, chunk) = waitFor fut.read()
let (valid, chunk) = fut.read()
if not valid: break
f.write chunk
]#
@ -256,11 +208,11 @@ proc print(ast: NodeObj; s: Stream) =
of nodeAtom:
ast.atom.print(s)
of nodeList:
s.write "("
s.write "\n("
for n in ast.list:
s.write "\n"
s.write " "
n.print(s)
s.write "\n)"
s.write ")"
of nodeFunc:
s.write "#<procedure "
s.write ast.name
@ -273,7 +225,10 @@ proc print(ast: NodeObj; s: Stream) =
s.write "»"
proc print(ast: NodeRef; s: Stream) =
ast[].print s
if ast.isNil:
s.write "«nil»"
else:
ast[].print s
proc readAtom(r: Reader): Atom =
let token = r.next
@ -293,25 +248,9 @@ proc readAtom(r: Reader): Atom =
except:
newAtomError(getCurrentExceptionMsg())
#[
proc chainData(end: Atom; a: Atom; env: Env): Atom =
## Convert an atom to data and chain it to the end of a data chain,
## return the new end of the chain.
var next: Atom
case a.kind:
of atomData:
next = a
else: discard
if end.isNil:
result = next
else:
doAsset(end.nextData.isNil)
end.nextData = next
]#
proc readForm(r: Reader): NodeRef
proc readForm(r: Reader): Node
proc readList(r: Reader): Node =
proc readList(r: Reader): NodeRef =
result = newNodeList()
while true:
if (r.pos == r.tokens.len):
@ -324,7 +263,7 @@ proc readList(r: Reader): Node =
else:
result.append r.readForm
proc readForm(r: Reader): Node =
proc readForm(r: Reader): NodeRef =
case r.peek[0]
of '(':
discard r.next
@ -341,7 +280,7 @@ proc tokenizer(s: string): Tokens =
if t.len > 0:
result.add t
proc read(r: Reader; line: string): Node =
proc read(r: Reader; line: string): NodeRef =
r.pos = 0
if r.buffer.len > 0:
r.buffer.add " "
@ -355,27 +294,26 @@ proc read(r: Reader; line: string): Node =
else:
r.buffer.setLen 0
proc assertArgCount(args: NodeObj; len: int) =
var arg = args
for _ in 2..len:
doAssert(not arg.nextRef.isNil)
arg = arg.next
doAssert(arg.nextRef.isNil)
##
# Builtin functions
#
proc applyFunc(env: Env; args: NodeObj): Node =
proc applyFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
let
fn = args
ln = fn.next
fn.fun(env, ln.head)
proc catFunc(env: Env; arg: NodeObj): Node =
#[
result = Atom(kind: atomData).newNode
var atom = result.atom
for n in args:
assert(n.kind == nodeAtom, "cat called on a non-atomic node")
#atom = atom.chainData(n.atom, env)
]#
result = newNodeError("cat not implemented", arg)
proc cborFunc(env: Env; arg: NodeObj): Node =
proc cborFunc(env: Env; arg: NodeObj): NodeRef =
assertArgCount(arg, 1)
let a = arg.atom
if a.cid.isDagCbor:
let
@ -385,7 +323,22 @@ proc cborFunc(env: Env; arg: NodeObj): Node =
else:
"".newAtomString.newNode
proc consFunc(env: Env; args: NodeObj): Node =
proc copyFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 3)
let
x = args
y = x.next
z = y.next
var root = newUnixFsRoot()
let dir = env.getUnixfs x.atom.cid
for name, node in dir.items:
root.add(name, node)
root.add(z.atom.str, dir[y.atom.str])
let cid = env.store.putDag(root.toCbor)
cid.newAtom.newNode
proc consFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
result = newNodeList()
let
car = args
@ -393,7 +346,8 @@ proc consFunc(env: Env; args: NodeObj): Node =
result.append car
result.append cdr.head
proc defineFunc(env: Env; args: NodeObj): Node =
proc defineFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
let
symN = args
val = args.next
@ -401,14 +355,14 @@ proc defineFunc(env: Env; args: NodeObj): Node =
new result
result[] = val
proc dumpFunc(env: Env; args: NodeObj): Node =
proc dumpFunc(env: Env; args: NodeObj): NodeRef =
result = newNodeList()
for n in args.walk:
let a = n.atom
for p in env.store.dumpPaths(a.cid):
result.append p.newAtomString.newNode
proc globFunc(env: Env; args: NodeObj): Node =
proc globFunc(env: Env; args: NodeObj): NodeRef =
result = newNodeList()
for n in args.walk:
let a = n.atom
@ -421,7 +375,7 @@ proc globFunc(env: Env; args: NodeObj): Node =
else:
result = newNodeError("invalid glob argument", n)
proc ingestFunc(env: Env; args: NodeObj): Node =
proc ingestFunc(env: Env; args: NodeObj): NodeRef =
var root = newUnixFsRoot()
for n in args.walk:
let
@ -436,10 +390,10 @@ proc ingestFunc(env: Env; args: NodeObj): Node =
let dir = env.getDir a.path
root.add(name, dir)
let
cid = waitFor env.store.putDag(root.toCbor)
cid = env.store.putDag(root.toCbor)
cid.newAtom.newNode
proc listFunc(env: Env; args: NodeObj): Node =
proc listFunc(env: Env; args: NodeObj): NodeRef =
## Standard Lisp 'list' function.
result = newNodeList()
new result.headRef
@ -448,7 +402,7 @@ proc listFunc(env: Env; args: NodeObj): Node =
while not result.tailRef.nextRef.isNil:
result.tailRef = result.tailRef.nextRef
proc lsFunc(env: Env; args: NodeObj): Node =
proc lsFunc(env: Env; args: NodeObj): NodeRef =
result = newNodeList()
for n in args.walk:
let a = n.atom
@ -458,21 +412,21 @@ proc lsFunc(env: Env; args: NodeObj): Node =
for name, u in ufsNode.items:
assert(not name.isNil)
assert(not u.isNil, name & " is nil")
case u.kind:
of fileNode, shallowFile:
result.append Atom(kind: atomFile, fName: name, file: u).newNode
of dirNode, shallowDir:
result.append Atom(kind: atomDir, dName: name, dir: u).newNode
let e = newNodeList()
e.append u.cid.newAtom.newNode
e.append name.newAtomString.newNode
result.append e
else:
raiseAssert("ls over a raw IPLD block")
proc mapFunc(env: Env; args: NodeObj): Node =
proc mapFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
result = newNodeList()
let f = args.fun
for v in args.next.list:
result.append f(env, v)
proc mergeFunc(env: Env; args: NodeObj): Node =
proc mergeFunc(env: Env; args: NodeObj): NodeRef =
var root = newUnixFsRoot()
for n in args.walk:
let a = n.atom
@ -480,29 +434,29 @@ proc mergeFunc(env: Env; args: NodeObj): Node =
let dir = env.getUnixfs a.cid
for name, node in dir.items:
root.add(name, node)
let cid = waitFor env.store.putDag(root.toCbor)
let cid = env.store.putDag(root.toCbor)
cid.newAtom.newNode
proc pathFunc(env: Env; arg: NodeObj): Node =
proc pathFunc(env: Env; arg: NodeObj): NodeRef =
result = arg.atom.str.newAtomPath.newNode
proc rootFunc(env: Env; args: NodeObj): Node =
proc rootFunc(env: Env; args: NodeObj): NodeRef =
var root = newUnixFsRoot()
let
name = args.atom.str
cid = args.next.atom.cid
ufs = env.getUnixfs cid
root.add(name, ufs)
let rootCid = waitFor env.store.putDag(root.toCbor)
let rootCid = env.store.putDag(root.toCbor)
rootCid.newAtom.newNode
proc walkFunc(env: Env; args: NodeObj): Node =
proc walkFunc(env: Env; args: NodeObj): NodeRef =
assert args.atom.cid.isValid
let
rootCid = args.atom.cid
walkPath = args.next.atom.str
root = env.getUnixfs rootCid
final = waitFor env.store.walk(root, walkPath)
final = env.store.walk(root, walkPath)
if final.isNil:
result = newNodeError("no walk to '$1'" % walkPath, args)
else:
@ -516,16 +470,16 @@ proc bindEnv(env: Env; name: string; fun: Func) =
assert(not env.bindings.contains name)
env.bindings[name] = NodeObj(kind: nodeFunc, fun: fun, name: name)
proc newEnv(storePath: string): Env =
proc newEnv(store: IpldStore): Env =
result = Env(
store: newFileStore(storePath),
store: store,
bindings: initTable[string, NodeObj](),
paths: initTable[string, UnixfsNode](),
cids: initTable[Cid, UnixfsNode]())
result.bindEnv "apply", applyFunc
result.bindEnv "cat", catFunc
result.bindEnv "cbor", cborFunc
result.bindEnv "cons", consFunc
result.bindEnv "copy", copyFunc
result.bindEnv "define", defineFunc
result.bindEnv "dump", dumpFunc
result.bindEnv "glob", globFunc
@ -537,9 +491,10 @@ proc newEnv(storePath: string): Env =
result.bindEnv "path", pathFunc
result.bindEnv "root", rootFunc
result.bindEnv "walk", walkFunc
proc eval(ast: Node; env: Env): Node
proc eval_ast(ast: Node; env: Env): Node =
proc eval(ast: NodeRef; env: Env): NodeRef
proc eval_ast(ast: NodeRef; env: Env): NodeRef =
result = ast
case ast.kind
of nodeList:
@ -554,11 +509,12 @@ proc eval_ast(ast: Node; env: Env): Node =
of nodeAtom:
if ast.atom.kind == atomSymbol:
if env.bindings.contains ast.atom.sym:
result = new Node
result = new NodeRef
result[] = env.bindings[ast.atom.sym]
else: discard
proc eval(ast: Node; env: Env): Node =
proc eval(ast: NodeRef; env: Env): NodeRef =
var input = ast[]
try:
if ast.kind == nodeList:
if ast.headRef == nil:
@ -568,51 +524,79 @@ proc eval(ast: Node; env: Env): Node =
ast = eval_ast(ast, env)
head = ast.headRef
if head.kind == nodeFunc:
head.fun(env, head.next)
if not head.nextRef.isNil:
input = head.next
head.fun(env, input)
else:
input = NodeObj(kind: nodeList)
head.fun(env, input)
else:
newNodeError("not a function", head[])
input = head[]
newNodeError("not a function", input)
else:
eval_ast(ast, env)
except EvalError:
newNodeError(getCurrentExceptionMsg(), ast[])
newNodeError(getCurrentExceptionMsg(), input)
except FieldError:
newNodeError("invalid argument", ast[])
newNodeError("invalid argument", input)
except MissingObject:
let e = (MissingObject)getCurrentException()
newNodeError($e.cid & " not in store", ast[])
newNodeError("object not in store", input)
except OSError:
newNodeError(getCurrentExceptionMsg(), input)
proc main() =
var
env: Env
interactive: bool
block:
var scripted = false
when defined(genode):
import ipldclient
proc openStore(): IpldStore =
result = newIpldClient("repl")
scripted = true # do not use linenoise for the moment
#[
for kind, key, value in getopt():
if kind == cmdShortOption and key == "s":
scripted = true
else:
quit "unhandled argument " & key
]#
else:
import ipfsdaemon
proc openStore(): IpldStore =
for kind, key, value in getopt():
case kind
of cmdArgument:
if not env.isNil:
quit "only a single store path argument is accepted"
env = newEnv(key)
of cmdLongOption:
if key == "interactive":
interactive = true
of cmdShortOption:
if key == "i":
interactive = true
of cmdEnd:
discard
if env.isNil:
quit "store path must be passed as an argument"
if key == "s":
scripted = true
else:
quit "unhandled argument " & key
of cmdArgument:
if not result.isNil:
quit "only a single store path argument is accepted"
try:
result = if key.startsWith "http://":
newIpfsStore(key) else: newFileStore(key)
except:
quit("failed to open store at $1 ($2)" % [key, getCurrentExceptionMsg()])
else:
quit "unhandled argument " & key
if result.isNil:
quit "IPFS daemon URL must be specified"
import rdstdin
proc readLineSimple(prompt: string; line: var TaintedString): bool =
stdin.readLine(line)
proc main() =
let
store = openStore()
env = newEnv(store)
outStream = stdout.newFileStream
readLine = if scripted: readLineSimple else: readLineFromStdin
let outStream = stdout.newFileStream
var
reader = newReader()
line = newStringOfCap 128
while true:
if not stdin.readLine line:
when not defined(release):
stderr.writeLine "Path cache miss/hit ", env.pathCacheMiss, "/", env.pathCacheHit
stderr.writeLine " CID cache miss/hit ", env.cidCacheMiss, "/", env.cidCacheHit
quit()
while readLine("> ", line):
if line.len > 0:
let ast = reader.read(line)
if not ast.isNil:
@ -621,3 +605,4 @@ proc main() =
flush outStream
main()
quit 0 # Genode doesn't implicitly quit

54
ipldreplicator.nim Normal file
View File

@ -0,0 +1,54 @@
import streams, strutils, os, ipld, cbor, multiformats, hex,
ipldstore, ipldclient
type
IpldReplicator* = ref IpldReplicatorObj
IpldReplicatorObj* = object of IpldStoreObj
toStore, fromStore: IpldStore
cache: string
cacheCid: Cid
proc replicatedPut(s: IpldStore; blk: string): Cid =
var r = IpldReplicator s
r.toStore.put(blk)
proc replicatedGetBuffer(s: IpldStore; cid: Cid; buf: pointer; len: Natural): int =
var r = IpldReplicator s
if r.cacheCid == cid:
assert(cid.verify(r.cache), "cached block is invalid from previous get")
if r.cache.len > len:
raise newException(BufferTooSmall, "")
result = r.cache.len
copyMem(buf, r.cache[0].addr, result)
else:
try:
result = r.toStore.getBuffer(cid, buf, len)
r.cacheCid = cid
r.cache.setLen result
copyMem(r.cache[0].addr, buf, result)
assert(cid.verify(r.cache), "cached block is invalid after copy from To store")
except MissingObject:
result = r.fromStore.getBuffer(cid, buf, len)
r.cacheCid = cid
r.cache.setLen result
copyMem(r.cache[0].addr, buf, result)
assert(cid.verify(r.cache), "replicate cache is invalid after copy from From store")
discard r.toStore.put(r.cache)
proc replicatedGet(s: IpldStore; cid: Cid; result: var string) =
var r = IpldReplicator s
try: r.toStore.get(cid, result)
except MissingObject:
r.fromStore.get(cid, result)
discard r.toStore.put(result)
proc newIpldReplicator*(toStore, fromStore: IpldStore): IpldReplicator =
## Blocks retrieved by `get` are not verified.
IpldReplicator(
putImpl: replicatedPut,
getBufferImpl: replicatedGetBuffer,
getImpl: replicatedGet,
toStore: toStore,
fromStore: fromStore,
cache: "",
cacheCid: initCid())

View File

@ -1,9 +1,11 @@
import asyncdispatch, asyncfile, streams, strutils, os, ipld, cbor, multiformats, hex, ropes
import asyncdispatch, asyncfile, streams, strutils, os, ipld, cbor, multiformats, hex
type
MissingObject* = ref object of SystemError
cid*: Cid ## Missing object identifier
BufferTooSmall* = object of SystemError
proc newMissingObject*(cid: Cid): MissingObject =
MissingObject(msg: "object missing from store", cid: cid)
@ -11,71 +13,79 @@ type
IpldStore* = ref IpldStoreObj
IpldStoreObj* = object of RootObj
closeImpl*: proc (s: IpldStore) {.nimcall, gcsafe.}
putImpl*: proc (s: IpldStore; blk: string): Future[Cid] {.nimcall, gcsafe.}
getImpl*: proc (s: IpldStore; cid: Cid): Future[string] {.nimcall, gcsafe.}
fileStreamImpl*: proc (s: IpldStore; cid: Cid; fut: FutureStream[string]): Future[void] {.nimcall, gcsafe.}
putImpl*: proc (s: IpldStore; blk: string): Cid {.nimcall, gcsafe.}
getBufferImpl*: proc (s: IpldStore; cid: Cid; buf: pointer; len: Natural): int {.nimcall, gcsafe.}
getImpl*: proc (s: IpldStore; cid: Cid; result: var string) {.nimcall, gcsafe.}
proc close*(s: IpldStore) =
AsyncIpldStore* = ref AsyncIpldStoreObj
AsyncIpldStoreObj* = object of RootObj
closeImpl*: proc (s: AsyncIpldStore) {.nimcall, gcsafe.}
putImpl*: proc (s: AsyncIpldStore; blk: string): Future[Cid] {.nimcall, gcsafe.}
getImpl*: proc (s: AsyncIpldStore; cid: Cid): Future[string] {.nimcall, gcsafe.}
proc close*(s: IpldStore | AsyncIpldStore) =
## Close active store resources.
if not s.closeImpl.isNil: s.closeImpl(s)
proc put*(s: IpldStore; blk: string): Future[Cid] =
proc put*(s: IpldStore; blk: string): Cid =
## Place a raw block to the store.
assert(not s.putImpl.isNil)
assert(blk.len > 0)
s.putImpl(s, blk)
proc put*(s: AsyncIpldStore; blk: string): Future[Cid] =
## Place a raw block to the store.
assert(not s.putImpl.isNil)
s.putImpl(s, blk)
proc get*(s: IpldStore; cid: Cid): Future[string] =
proc getBuffer*(s: IpldStore; cid: Cid; buf: pointer; len: Natural): int =
## Copy a raw block from the store into a buffer pointer.
assert cid.isValid
assert(not s.getBufferImpl.isNil)
result = s.getBufferImpl(s, cid, buf, len)
assert(result > 0)
proc get*(s: IpldStore; cid: Cid; result: var string) =
## Retrieve a raw block from the store.
assert(not s.getImpl.isNil)
assert cid.isValid
assert(not result.isNil)
s.getImpl(s, cid, result)
assert(result.len > 0)
proc get*(s: IpldStore; cid: Cid): string =
## Retrieve a raw block from the store.
result = ""
s.get(cid, result)
proc get*(s: AsyncIpldStore; cid: Cid): Future[string] =
## Retrieve a raw block from the store.
assert cid.isValid
assert(not s.getImpl.isNil)
s.getImpl(s, cid)
{.deprecated: [putRaw: put, getRaw: get].}
proc putDag*(s: IpldStore; dag: Dag): Future[Cid] {.async.} =
proc putDag*(s: IpldStore; dag: Dag): Cid =
## Place an IPLD node in the store.
assert(not s.putImpl.isNil)
let
raw = dag.toBinary
cid = raw.CidSha256(MulticodecTag.DagCbor)
discard await s.putImpl(s, raw)
result = cid
var raw = dag.toBinary
discard s.putImpl(s, raw)
raw.CidSha256(MulticodecTag.DagCbor)
proc getDag*(s: IpldStore; cid: Cid): Future[Dag] {.async.} =
proc getDag*(s: IpldStore; cid: Cid): Dag =
## Retrieve an IPLD node from the store.
assert cid.isValid
assert(not s.getImpl.isNil)
let raw = await s.getImpl(s, cid)
assert(not raw.isNil)
result = parseDag raw
proc fileStream*(s: IpldStore; cid: Cid; fut: FutureStream[string]): Future[void] {.async, deprecated.} =
## Asynchronously stream a file from a CID list.
## TODO: doesn't need to be a file, can be a raw CID or
## a DAG that is simply a list of other CIDs.
if not s.fileStreamImpl.isNil:
# use an optimized implementation
await s.fileStreamImpl(s, cid, fut)
else:
# use the simple implementation
if cid.isRaw:
let blk = await s.get(cid)
await fut.write(blk)
elif cid.isDagCbor:
let dag = await s.getDag(cid)
for link in dag["links"].items:
let subCid = link["cid"].getBytes.parseCid
await fileStream(s, subCid, fut)
else:
discard
parseDag s.get(cid)
type
FileStore* = ref FileStoreObj
## A store that writes nodes and leafs as files.
FileStoreObj = object of IpldStoreObj
root: string
AsyncFileStore* = ref AsyncFileStoreObj
## A store that writes nodes and leafs as files.
AsyncFileStoreObj = object of AsyncIpldStoreObj
root: string
proc parentAndFile(fs: FileStore; cid: Cid): (string, string) {.deprecated.} =
proc parentAndFile(fs: FileStore|AsyncFileStore; cid: Cid): (string, string) =
## Generate the parent path and file path of CID within the store.
let digest = hex.encode(cid.digest)
var hashType: string
@ -91,7 +101,21 @@ proc parentAndFile(fs: FileStore; cid: Cid): (string, string) {.deprecated.} =
result[0] = fs.root / hashType / digest[0..1]
result[1] = result[0] / digest[2..digest.high]
proc putToFile(fs: FileStore; cid: Cid; blk: string) {.async.} =
proc fsPut(s: IpldStore; blk: string): Cid =
var fs = FileStore(s)
result = blk.CidSha256
let (dir, path) = fs.parentAndFile result
if not existsDir dir:
createDir dir
if not existsFile path:
let
tmp = fs.root / "tmp"
writeFile(tmp, blk)
moveFile(tmp, path)
proc fsPutAsync(s: AsyncIpldStore; blk: string): Future[Cid] {.async.} =
var fs = AsyncFileStore(s)
let cid = blk.CidSha256
let (dir, path) = fs.parentAndFile cid
if not existsDir dir:
createDir dir
@ -102,61 +126,73 @@ proc putToFile(fs: FileStore; cid: Cid; blk: string) {.async.} =
await file.write(blk)
close file
moveFile(tmp, path)
result = cid
proc fsPut(s: IpldStore; blk: string): Future[Cid] {.async.} =
var fs = FileStore(s)
let cid = blk.CidSha256
await fs.putToFile(cid, blk)
proc fsGet(s: IpldStore; cid: Cid): Future[string] =
result = newFuture[string]("fsGet")
proc fsGetBuffer(s: IpldStore; cid: Cid; buf: pointer; len: Natural): int =
var fs = FileStore(s)
let (_, path) = fs.parentAndFile cid
if existsFile path:
let fSize = path.getFileSize
if fSize > MaxBlockSize:
discard tryRemoveFile path
raise cid.newMissingObject
if fSize > len.int64:
raise newException(BufferTooSmall, "")
let file = open(path, fmRead)
let blk = file.readAll()
result = file.readBuffer(buf, len)
close file
if result == 0:
raise cid.newMissingObject
proc fsGet(s: IpldStore; cid: Cid; result: var string) =
var fs = FileStore(s)
let (_, path) = fs.parentAndFile cid
if existsFile path:
let fSize = path.getFileSize
if fSize > MaxBlockSize:
discard tryRemoveFile path
raise cid.newMissingObject
result.setLen fSize.int
let
file = open(path, fmRead)
n = file.readChars(result, 0, result.len)
close file
doAssert(n == result.len)
else:
raise cid.newMissingObject
proc fsGetAsync(s: AsyncIpldStore; cid: Cid): Future[string] {.async.} =
var fs = AsyncFileStore(s)
let (_, path) = fs.parentAndFile cid
if existsFile path:
let
file = openAsync(path, fmRead)
blk = await file.readAll()
close file
if cid.verify(blk):
result.complete blk
result = blk
else:
discard tryRemoveFile path
# bad block, remove it
if not result.finished:
result.fail cid.newMissingObject
proc fsFileStreamRecurs(fs: FileStore; cid: Cid; fut: FutureStream[string]) {.async.} =
if cid.isRaw:
let (_, path) = fs.parentAndFile cid
if existsFile path:
let
file = openAsync(path, fmRead)
while true:
let data = await file.read(4000)
if data.len == 0:
break
await fut.write(data)
close file
elif cid.isDagCbor:
let dag = await fs.getDag(cid)
doAssert(not dag.isNil)
doAssert(dag.contains("links"), $dag & " does not contain 'links'")
for link in dag.items:
let cid = link["cid"].getBytes.parseCid
await fs.fsFileStreamRecurs(cid, fut)
raise cid.newMissingObject
else:
doAssert(false)
discard
proc fsFileStream(s: IpldStore; cid: Cid; fut: FutureStream[string]) {.async.} =
var fs = FileStore(s)
await fs.fsFileStreamRecurs(cid, fut)
complete fut
raise cid.newMissingObject
proc newFileStore*(root: string): FileStore =
## Blocks retrieved by `get` are not hashed and verified.
if not existsDir(root):
createDir root
new result
result.putImpl = fsPut
result.getBufferImpl = fsGetBuffer
result.getImpl = fsGet
result.fileStreamImpl = fsFileStream
result.root = root
proc newAsyncFileStore*(root: string): AsyncFileStore =
## Every block retrieved by `get` is hashed and verified.
if not existsDir(root):
createDir root
new result
result.putImpl = fsPutAsync
result.getImpl = fsGetAsync
result.root = root

View File

@ -1,4 +1,4 @@
import asyncdispatch, strutils, multiformats, streams, tables, cbor, os, hex, math
import strutils, multiformats, streams, tables, cbor, os, hex, math
import ipld, ipldstore
@ -74,11 +74,24 @@ const
DirTag* = 0xda3c80 ## CBOR tag for UnixFS directories
FileTag* = 0xda3c81 ## CBOR tag for UnixFS files
proc isUnixfs*(bin: string): bool =
## Check if a string contains a UnixFS node
## in CBOR form.
var
s = newStringStream bin
c: CborParser
try:
c.open s
c.next
if c.kind == CborEventKind.cborTag:
result = c.tag == DirTag or c.tag == FileTag
except ValueError: discard
close s
proc toCbor*(u: UnixFsNode): CborNode =
case u.kind
of fileNode:
if u.links.isNil:
raiseAssert "cannot encode single-chunk files"
doAssert(not u.links.isNil, "cannot encode single-chunk files")
let array = newCborArray()
array.seq.setLen u.links.len
for i in 0..u.links.high:
@ -127,7 +140,7 @@ proc parseUnixfs*(raw: string; cid: Cid): UnixFsNode =
open(c, newStringStream(raw))
next c
parseAssert(c.kind == CborEventKind.cborTag, "data not tagged")
let tag = c.parseTag
let tag = c.tag
if tag == FileTag:
result.kind = fileNode
next c
@ -141,7 +154,7 @@ proc parseUnixfs*(raw: string; cid: Cid): UnixFsNode =
for _ in 1..nAttrs:
next c
parseAssert(c.kind == CborEventKind.cborPositive, "link map key not an integer")
let key = c.parseInt.EntryKey
let key = c.readInt.EntryKey
next c
case key
of typeKey:
@ -152,7 +165,7 @@ proc parseUnixfs*(raw: string; cid: Cid): UnixFsNode =
result.links[i].cid = buf.parseCid
of sizeKey:
parseAssert(c.kind == CborEventKind.cborPositive, "link size not encoded properly")
result.links[i].size = c.parseInt
result.links[i].size = c.readInt
result.size.inc result.links[i].size
elif tag == DirTag:
result.kind = dirNode
@ -175,10 +188,10 @@ proc parseUnixfs*(raw: string; cid: Cid): UnixFsNode =
for i in 1 .. nAttrs:
next c
parseAssert(c.kind == CborEventKind.cborPositive)
case c.parseInt.EntryKey
case c.readInt.EntryKey
of typeKey:
next c
case c.parseInt.UnixFsType
case c.readInt.UnixFsType
of ufsFile: entry.kind = shallowFile
of ufsDir: entry.kind = shallowDir
of dataKey:
@ -187,7 +200,7 @@ proc parseUnixfs*(raw: string; cid: Cid): UnixFsNode =
entry.cid = buf.parseCid
of sizeKey:
next c
entry.size = c.parseInt
entry.size = c.readInt
else:
parseAssert(false, raw)
next c
@ -228,13 +241,13 @@ proc lookupFile*(dir: UnixFsNode; name: string): tuple[cid: Cid, size: BiggestIn
result.cid = f.cid
result.size = f.size
proc addFile*(store: IpldStore; path: string): Future[UnixFsNode] {.async.} =
proc addFile*(store: IpldStore; path: string): UnixFsNode =
## Add a file to the store and a UnixfsNode.
let
fStream = newFileStream(path, fmRead)
u = newUnixfsFile()
for cid, chunk in fStream.simpleChunks:
discard await store.put(chunk)
discard store.put(chunk)
if u.links.isNil:
u.links = newSeqOfCap[FileLink](1)
u.links.add FileLink(cid: cid, size: chunk.len)
@ -247,46 +260,46 @@ proc addFile*(store: IpldStore; path: string): Future[UnixFsNode] {.async.} =
# take a shortcut use the raw chunk CID
u.cid = u.links[0].cid
else:
u.cid = await store.putDag(u.toCbor)
u.cid = store.putDag(u.toCbor)
result = u
close fStream
proc addDir*(store: IpldStore; dirPath: string): Future[UnixFsNode] {.async.} =
proc addDir*(store: IpldStore; dirPath: string): UnixFsNode =
var dRoot = newUnixFsRoot()
for kind, path in walkDir dirPath:
# need to use `waitFor` in this iterator
var child: UnixFsNode
case kind
of pcFile:
child = waitFor store.addFile path
child = store.addFile path
of pcDir:
child = waitFor store.addDir(path)
child = store.addDir(path)
else: continue
dRoot.add path.extractFilename, child
let
dag = dRoot.toCbor
cid = await store.putDag(dag)
cid = store.putDag(dag)
result = newUnixfsDir(cid)
proc open*(store: IpldStore; cid: Cid): Future[UnixfsNode] {.async.} =
proc open*(store: IpldStore; cid: Cid): UnixfsNode =
assert cid.isValid
assert(not cid.isRaw)
let raw = await store.get(cid)
let raw = store.get(cid)
result = parseUnixfs(raw, cid)
proc openDir*(store: IpldStore; cid: Cid): Future[UnixfsNode] {.async.} =
proc openDir*(store: IpldStore; cid: Cid): UnixfsNode =
assert cid.isValid
let raw = await store.get(cid)
assert(not raw.isNil)
var raw = ""
try: store.get(cid, raw)
except MissingObject: raise cid.newMissingObject
# this sucks
result = parseUnixfs(raw, cid)
assert(result.kind == dirNode)
proc walk*(store: IpldStore; dir: UnixfsNode; path: string; cache = true): Future[UnixfsNode] {.async.} =
proc walk*(store: IpldStore; dir: UnixfsNode; path: string; cache = true): UnixfsNode =
## Walk a path down a root.
assert dir.cid.isValid
assert(path != "")
assert(dir.kind == dirNode)
result = dir
var raw = ""
for name in split(path, DirSep):
if name == "": continue
if result.kind == fileNode:
@ -297,40 +310,46 @@ proc walk*(store: IpldStore; dir: UnixfsNode; path: string; cache = true): Futur
result = nil
break
if (next.kind in {shallowFile, shallowDir}) and (not next.cid.isRaw):
let raw = await store.get(next.cid)
store.get(next.cid, raw)
next = parseUnixfs(raw, next.cid)
if cache:
result.entries[name] = next
result = next
iterator fileChunks*(store: IpldStore; file: UnixfsNode): Future[string] =
#[
iterator fileChunks*(store: IpldStore; file: UnixfsNode): string =
## Iterate over the links in a file and return futures for link data.
if file.cid.isRaw:
yield store.get(file.cid)
else:
var i = 0
var
i = 0
chunk = ""
while i < file.links.len:
yield store.get(file.links[i].cid)
store.get(file.links[i].cid, chunk)
yield chunk
inc i
]#
proc readBuffer*(store: IpldStore; file: UnixfsNode; pos: BiggestInt;
buf: pointer; size: int): Future[int] {.async.} =
buf: pointer; size: int): int =
## Read a UnixFS file into a buffer. May return zero for any failure.
assert(pos > -1)
var
filePos = 0
chunk = ""
if pos < file.size:
if file.cid.isRaw:
let pos = pos.int
var blk = await store.get(file.cid)
if pos < blk.high:
copyMem(buf, blk[pos].addr, min(blk.len - pos, size))
store.get(file.cid, chunk)
if pos < chunk.high:
copyMem(buf, chunk[pos].addr, min(chunk.len - pos, size))
result = size
else:
for i in 0..file.links.high:
let linkSize = file.links[i].size
if filePos <= pos and pos < filePos+linkSize:
var chunk = await store.get(file.links[i].cid)
store.get(file.links[i].cid, chunk)
let
chunkPos = int(pos - filePos)
n = min(chunk.len-chunkPos, size)
@ -339,7 +358,7 @@ proc readBuffer*(store: IpldStore; file: UnixfsNode; pos: BiggestInt;
break
filePos.inc linkSize
proc path(fs: FileStore; cid: Cid): string =
proc fileStorePath(cid: Cid): string =
## Generate the file path of a CID within the store.
assert cid.isValid
let digest = hex.encode(cid.digest)
@ -355,24 +374,24 @@ proc path(fs: FileStore; cid: Cid): string =
raise newException(SystemError, "unhandled hash type")
result = hashType / digest[0..1] / digest[2..digest.high]
proc dumpPaths*(paths: var seq[string]; store: FileStore; cid: Cid) =
proc dumpPaths*(paths: var seq[string]; store: IpldStore; cid: Cid) =
## Recursively dump the constituent FileStore chunk files of a CID to a string seq.
## TODO: use CBOR tags rather than reconstitute UnixFS nodes.
paths.add store.path(cid)
paths.add cid.fileStorePath
if cid.isDagCbor:
let u = waitFor store.open(cid)
let u = store.open(cid)
case u.kind:
of fileNode:
assert(not u.links.isNil)
for i in 0..u.links.high:
paths.add store.path(u.links[i].cid)
paths.add u.links[i].cid.fileStorePath
of dirNode:
for _, child in u.items:
paths.dumpPaths(store, child.cid)
else:
raiseAssert "cannot dump shallow nodes"
iterator dumpPaths*(store: FileStore; cid: Cid): string =
iterator dumpPaths*(store: IpldStore; cid: Cid): string =
var collector = newSeq[string]()
collector.dumpPaths(store, cid)
for p in collector: