Functional inserts

This commit is contained in:
Ehmry - 2018-12-24 21:19:03 +01:00
parent e2177dc849
commit f01bf5083a
19 changed files with 499 additions and 489 deletions

4
.gitignore vendored
View File

@ -1,2 +1,2 @@
nimcache
blobset
/blobset
/tests/test_set

View File

@ -3,7 +3,7 @@ when not isMainModule:
import std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin
import cbor
import ./blobsets, ./blobsets/stores
import ./blobsets, ./blobsets/filestores
when defined(genode):
import dagfsclient
@ -26,7 +26,8 @@ proc dumpMain() =
writeLine(stderr, "failed to dump '", args[i], "', ", getCurrentExceptionMsg())
quit(-1)
proc insertPath(set: BlobSet; store: BlobStore; kind: PathComponent; path: string) =
proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string): BlobSet =
result = bs
try:
case kind
of pcFile, pcLinkToFile:
@ -34,11 +35,11 @@ proc insertPath(set: BlobSet; store: BlobStore; kind: PathComponent; path: strin
let (id, size) = store.ingestFile(path)
path.removePrefix(getCurrentDir())
path.removePrefix("/")
set.insert(path, id, size)
result = result.insert(path, id, size)
writeLine(stdout, id, align($size, 11), " ", path)
of pcDir, pcLinkToDir:
for kind, subPath in path.walkDir:
set.insertPath(store, kind, subPath)
result = store.insertPath(result, kind, subPath)
except:
let e = getCurrentException()
writeLine(stderr, "failed to ingest '", path, "', ", e.msg)
@ -54,7 +55,7 @@ proc ingestMain() =
let store = newFileStore("/tmp/blobs")
for i in 1..args.high:
let path = normalizedPath args[i]
set.insertPath(store, path.getFileInfo.kind, path)
set = store.insertPath(set, path.getFileInfo.kind, path)
let final = store.commit set
writeLine(stdout, final.setId)
@ -125,8 +126,8 @@ type
proc print(a: Atom; s: Stream)
proc print(ast: NodeRef; s: Stream)
proc newAtom(x: tuple[id: BlobId, size: BiggestInt]): Atom =
Atom(kind: atomBlob, blob: x.id, size: x.size)
proc newAtom(id: BlobId, size: BiggestInt): Atom =
Atom(kind: atomBlob, blob: id, size: size)
proc newAtom(bs: BlobSet): Atom =
Atom(kind: atomSet, bs: bs)
@ -218,7 +219,7 @@ proc getSet(env: Env; path: string): BlobSet=
result = env.sets.getOrDefault(path)
if result.isNil:
result = newBlobSet()
result.insertPath(env.store, path.getFileInfo.kind, path)
result = env.store.insertPath(result, path.getFileInfo.kind, path)
if not result.isEmpty:
env.sets[path] = result
@ -443,14 +444,26 @@ proc keyFunc(env: Env; args: NodeObj): NodeRef =
args.atom.str.toKey.newAtom.newNode
proc ingestFunc(env: Env; args: NodeObj): NodeRef =
var bs = newBlobSet()
var bs: BlobSet
for n in args.walk:
bs = env.store.union(bs, env.getSet(n.atom.path))
if bs.isNil:
bs = env.getSet(n.atom.path)
else:
bs = env.store.union(bs, env.getSet(n.atom.path))
result = bs.newAtom.newNode
proc insertFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 3)
let
trie = args.atom
name = args.next.atom
blob = args.next.next.atom
newNode(newAtom(env.store.insert(trie.bs, name.str, blob.blob, blob.size)))
proc blobFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 1)
newNode(newAtom(env.getBlob args.atom.path))
let (id, size) = env.getBlob args.atom.path
newNode(newAtom(id, size))
proc listFunc(env: Env; args: NodeObj): NodeRef =
## Standard Lisp 'list' function.
@ -461,21 +474,6 @@ proc listFunc(env: Env; args: NodeObj): NodeRef =
while not result.tailRef.nextRef.isNil:
result.tailRef = result.tailRef.nextRef
#[
proc lsFunc(env: Env; args: NodeObj): NodeRef =
result = newNodeList()
for n in args.walk:
let
a = n.atom
ufsNode = env.getUnixfs a.cid
if ufsNode.isDir:
for name, u in ufsNode.items:
let e = newNodeList()
e.append u.cid.newAtom.newNode
e.append name.newAtomString.newNode
result.append e
]#
proc mapFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
result = newNodeList()
@ -499,23 +497,34 @@ proc mergeFunc(env: Env; args: NodeObj): NodeRef =
proc pathFunc(env: Env; arg: NodeObj): NodeRef =
result = arg.atom.str.newAtomPath.newNode
#[
proc rootFunc(env: Env; args: NodeObj): NodeRef =
var root = newFsRoot()
proc removeFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
let
name = args.atom.str
cid = args.next.atom.cid
ufs = env.getUnixfs cid
root.add(name, ufs)
let rootCid = env.store.putDag(root.toCbor)
rootCid.newAtom.newNode
]#
bs = args.atom.bs
name = args.next.atom.str
newNode(newAtom(env.store.remove(bs, name)))
proc searchFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
var found: NodeRef
let
bs = args.atom.bs
name = args.next.atom.str
apply(env.store, bs, name) do (id: BlobId; size: BiggestInt):
found = newNode(newAtom(id, size))
if found.isNil:
result = newNodeList()
else:
result = found
proc unionFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2)
let bs = env.store.union(args.atom.bs, args.next.atom.bs)
bs.newAtom.newNode
var bs: BlobSet
for n in args.walk:
if bs.isNil:
bs = n.atom.bs
else:
bs = env.store.union(bs, n.atom.bs)
result = bs.newAtom.newNode
##
# Environment
@ -539,15 +548,15 @@ proc newEnv(store: BlobStore): Env =
#result.bindEnv "copy", copyFunc
result.bindEnv "define", defineFunc
result.bindEnv "glob", globFunc
result.bindEnv "key", keyFunc
result.bindEnv "ingest", ingestFunc
result.bindEnv "insert", insertFunc
result.bindEnv "key", keyFunc
result.bindEnv "list", listFunc
#result.bindEnv "ls", lsFunc
result.bindEnv "map", mapFunc
#result.bindEnv "merge", mergeFunc
result.bindEnv "path", pathFunc
#result.bindEnv "root", rootFunc
#result.bindEnv "walk", walkFunc
result.bindEnv "remove", removeFunc
result.bindEnv "search", searchFunc
result.bindEnv "union", unionFunc
proc eval(ast: NodeRef; env: Env): NodeRef
@ -597,8 +606,6 @@ proc eval(ast: NodeRef; env: Env): NodeRef =
newNodeError(getCurrentExceptionMsg(), input)
except FieldError:
newNodeError("invalid argument", input)
except MissingChunk:
newNodeError("chunk not in store", input)
except OSError:
newNodeError(getCurrentExceptionMsg(), input)

View File

@ -1,6 +1,7 @@
import std/hashes, std/streams, std/strutils, std/bitops, std/unicode, std/endians
import base58/bitcoin, cbor, siphash
import ./blobsets/priv/hex
import std/streams, std/strutils
import nimcrypto, nimcrypto/blake2
@ -209,17 +210,6 @@ func compactIndex(t: BlobSet; x: Key): int =
func masked(t: BlobSet; x: Key): bool =
((t.bitmap shr x.sparseIndex) and 1) != 0
proc apply*(bs: BlobSet; cb: proc (leaf: BlobSet)) =
## Apply a callback to each set element.
for node in bs.table:
case node.kind
of hotNode:
apply(node, cb)
of leafNode:
cb(node)
else:
raiseAssert("cannot apply to node type " & $node.kind)
func isEmpty*(s: BlobSet): bool = s.bitmap == Key(0)
## Test if a set is empty.
@ -255,39 +245,111 @@ func search*(t: BlobSet; name: string): BlobId =
else:
raise newException(KeyError, "blob set does not contain key")
func insert(t, l: BlobSet; depth: int) =
func apply(bs: BlobSet; cb: proc (leaf: BlobSet)) =
## Apply a callback to each set element.
for node in bs.table:
if node.isNil:
raiseAssert(bs.table.repr)
case node.kind
of hotNode:
apply(node, cb)
of leafNode:
cb(node)
else:
raiseAssert("cannot apply to node type " & $node.kind)
func apply*(t: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) =
## Apply a procedure to a named blob, if it is present
var
t = t
key = name.toKey
while true:
assert(key != 0, "keyspace exhausted during search")
if t.masked(key):
t = t.table[t.compactIndex(key)]
if t.kind == leafNode:
f(t.blob, t.size)
break
key = key shr keyChunkBits
else:
break
func contains*(bs: BlobSet; name: string): bool =
var found = false
apply(bs, name) do (id: BlobId; size: BiggestInt):
found = true
result = found
func insert(trie, l: BlobSet; depth: int): BlobSet =
## This procedure is recursive to a depth of keyBits/keyChunkBits.
# TODO: not functional?
doAssert(depth < (keyBits div keyChunkBits), "key space exhausted during insert")
result = BlobSet(kind: hotNode, bitmap: trie.bitmap, table: trie.table)
let key = l.key shr (depth * keyChunkBits)
if t.masked(key):
if result.masked(key):
let
depth = depth + 1
i = t.compactIndex(key)
case t.table[i].kind
i = result.compactIndex(key)
case result.table[i].kind
of hotNode:
t.table[i].insert(l, depth)
result.table[i] = insert(result.table[i], l, depth)
of coldNode:
raiseAssert("cannot insert into cold node")
of leafNode:
if t.table[i].key == l.key:
if result.table[i].key == l.key:
raise newException(KeyError, "key collision in blob set")
let
subtrei = newBlobSet()
subtrei.insert(t.table[i], depth)
subtrei.insert(l, depth)
t.table[i] = subtrei
var subtrie = newBlobSet()
subtrie = subtrie.insert(result.table[i], depth)
subtrie = subtrie.insert(l, depth)
result.table[i] = subtrie
else:
t.bitmap = t.bitmap or (Key(1) shl key.sparseIndex)
t.table.insert(l, t.compactIndex(key))
result.bitmap = result.bitmap or (Key(1) shl key.sparseIndex)
result.table.insert(l, result.compactIndex(key))
func insert*(t, l: BlobSet) = insert(t, l, 0)
# Insert set node `t` into `l`.
func insert*(trie, node: BlobSet): BlobSet = insert(trie, node, 0)
## Insert set node `node` into `trie`.
func insert*(t: BlobSet; name: string; blob: BlobId; size: BiggestInt) =
func insert*(t: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet =
## Insert a blob hash into a trie.
# TODO: this is not functional!
let leaf = BlobSet(kind: leafNode, key: name.toKey, blob: blob, size: size)
insert(t, leaf)
func remove(trie: BlobSet; key: Key; depth: int): BlobSet =
result = trie
let key = key shr (depth * keyChunkBits)
if trie.masked(key):
let
depth = depth + 1
i = trie.compactIndex(key)
case trie.table[i].kind
of hotNode:
let newTrie = remove(trie.table[i], key, depth)
if newTrie != trie.table[i]:
if newTrie.isNil:
if trie.table.len == 1:
result = nil
else:
result = newBlobSet()
for j in trie.table.low..trie.table.high:
if j == i: continue
result = insert(result, newTrie, depth)
of coldNode:
raiseAssert("cannot remove from cold node")
of leafNode:
if trie.table.len == 1:
result = nil
func remove*(trie: BlobSet; name: string): BlobSet =
## Remove a blob from a trie.
if trie.isEmpty:
result = trie
else:
let key = name.toKey
result = remove(trie, key, 0)
if result.isNil:
result = newBlobSet()
func toCbor*(x: BlobSet): CborNode =
const
nodeTag = 0
@ -313,3 +375,195 @@ func toCbor*(x: BlobSet): CborNode =
array.add x.blob.data
array.add x.size
newCborTag(leafTag, array)
func leafCount*(size: Natural): int = (size+blobLeafSize-1) div blobLeafSize
type
BlobKind* = enum
dataBlob, metaBlob
BlobStream* = ref BlobStreamObj
BlobStreamObj* = object of RootObj
closeImpl*: proc (s: BlobStream) {.nimcall, gcsafe.}
readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): int {.nimcall, gcsafe.}
IngestStream* = ref IngestStreamObj
IngestStreamObj* = object of RootObj
finishImpl*: proc (s: IngestStream): tuple[id: BlobId, size: BiggestInt] {.nimcall, gcsafe.}
ingestImpl*: proc (s: IngestStream; buf: pointer; size: int) {.nimcall, gcsafe.}
proc close*(s: BlobStream) =
assert(not s.closeImpl.isNil)
s.closeImpl(s)
proc read*(s: BlobStream; buf: pointer; len: Natural): int =
assert(not s.readImpl.isNil)
result = s.readImpl(s, buf, len)
proc finish*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
## Finish ingest stream
assert(not s.finishImpl.isNil)
s.finishImpl(s)
proc ingest*(s: IngestStream; buf: pointer; size: Natural) =
## Ingest stream
assert(not s.ingestImpl.isNil)
s.ingestImpl(s, buf, size)
proc ingest*(s: IngestStream; buf: var string) =
## Ingest stream
assert(not s.ingestImpl.isNil)
s.ingestImpl(s, buf[0].addr, buf.len)
type
BlobStore* = ref BlobStoreObj
BlobStoreObj* = object of RootObj
closeImpl*: proc (s: BlobStore) {.nimcall, gcsafe.}
openBlobStreamImpl*: proc (s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream {.nimcall, gcsafe.}
openIngestStreamImpl*: proc (s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream {.nimcall, gcsafe.}
proc close*(s: BlobStore) =
## Close active store resources.
if not s.closeImpl.isNil: s.closeImpl(s)
proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt; kind = dataBlob): BlobStream =
## Return a new `BlobStream` for reading a blob.
assert(not s.openBlobStreamImpl.isNil)
s.openBlobStreamImpl(s, id, size, kind)
proc openIngestStream*(s: BlobStore; size = 0.BiggestInt; kind = dataBlob): IngestStream =
## Return a new `IngestStream` for ingesting a blob.
assert(not s.openIngestStreamImpl.isNil)
s.openIngestStreamImpl(s, size, kind)
func compressTree*(leaves: var seq[BlobId]) =
var
ctx: Blake2b256
nodeOffset = 0
nodeDepth = 0
while leaves.len > 1:
nodeOffset = 0
inc nodeDepth
var pos, next: int
while pos < leaves.len:
ctx.init do (params: var Blake2bParams):
params.fanout = 2
params.depth = 255
params.leafLength = blobLeafSize
params.nodeOffset = nodeOffset
params.nodeDepth = nodeDepth
inc nodeOffset
ctx.update(leaves[pos].data)
inc pos
if pos < leaves.len:
ctx.update(leaves[pos].data)
inc pos
leaves[next] = ctx.finish()
inc next
leaves.setLen(next)
# TODO: BLAKE2 tree finalization flags
iterator dumpBlob*(store: BlobStore; id: BlobId): string =
var
stream = store.openBlobStream(id, kind=dataBlob)
buf = newString(blobLeafSize)
defer:
close stream
while true:
buf.setLen(blobLeafSize)
let n = stream.read(buf[0].addr, buf.len)
if n == 0:
break
buf.setLen(n)
yield buf
proc commit*(store: BlobStore; bs: BlobSet): BlobSet =
assert(bs.kind == hotNode)
for e in bs.table.mitems:
case e.kind
of coldNode, leafNode: discard
of hotNode:
e = store.commit e
let stream = store.openIngestStream(kind=metaBlob)
var buf = encode bs.toCbor
stream.ingest(buf)
let (id, _) = finish stream
result = BlobSet(kind: coldNode, setId: id)
proc apply*(store: BlobStore; bs: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) =
# TODO: lazy-load set
bs.apply(name, f)
proc insert*(store: BlobStore; bs: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet =
# TODO: lazy-load set
insert(bs, name, blob, size)
proc remove*(store: BlobStore; bs: BlobSet; name: string): BlobSet =
# TODO: lazy-load set
remove(bs, name)
proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet =
## Return the union of `sets`.
# TODO: lazy-load set
var fresh = newBlobSet()
proc freshInsert(leaf: BlobSet) =
fresh = insert(fresh, leaf)
for bs in sets:
assert(not bs.isnil)
bs.apply(freshInsert)
result = fresh
# Store implementations
#
type
NullIngestStream = ref NullIngestStreamObj
NullIngestStreamObj = object of IngestStreamObj
ctx: Blake2b256
leaves: seq[BlobId]
pos, nodeOffset: BiggestInt
proc nullBlobClose(s: BlobStream) = discard
proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = 0
proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
BlobStream(closeImpl: nullBlobClose, readImpl: nullBlobRead)
proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
var s = NullIngestStream(s)
s.leaves.add finish(s.ctx)
compressTree(s.leaves)
result.id = s.leaves[0]
result.size = s.pos
proc nullIngest(s: IngestStream; buf: pointer; len: Natural) =
var
s = NullIngestStream(s)
off = 0
buf = cast[ptr array[blobLeafSize, byte]](buf)
while off < len:
var n = min(blobLeafSize, len-off)
let leafOff = int(s.pos and blobLeafSizeMask)
if leafOff == 0:
if s.pos > 0:
s.leaves.add finish(s.ctx)
s.ctx.init do (params: var Blake2bParams):
params.fanout = 2
params.depth = 255
params.leafLength = blobLeafSize
params.nodeOffset = s.nodeOffset
inc s.nodeOffset
else:
n = min(n, blobLeafSize-leafOff)
s.ctx.update(buf[off].addr, n)
off.inc n
s.pos.inc n
proc nullOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream =
NullIngestStream(
finishImpl: nullFinish, ingestImpl: nullIngest, leaves: newSeq[BlobId]())
proc newNullStore*(): BlobStore =
BlobStore(
openBlobStreamImpl: nullOpenBlobStream,
openIngestStreamImpl: nullOpenIngestStream)

117
src/blobsets/filestores.nim Normal file
View File

@ -0,0 +1,117 @@
import ../blobsets
import std/asyncfile, std/asyncdispatch, std/os
import nimcrypto/blake2
proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: BiggestInt] =
## Ingest a file and return blob metadata.
let
file = openAsync(path, fmRead)
fileSize = file.getFileSize
defer:
close file
let stream = store.openIngestStream(fileSize, dataBlob)
if fileSize > 0:
var buf = newString(min(blobLeafSize, fileSize))
while true:
let n = waitFor file.readBuffer(buf[0].addr, buf.len)
if n == 0: break
stream.ingest(buf[0].addr, n)
result = finish stream
type
FsBlobStream = ref FsBlobStreamObj
FsBlobStreamObj = object of BlobStreamObj
path: string
file: AsyncFile
FsIngestStream = ref FsIngestStreamObj
FsIngestStreamObj = object of IngestStreamObj
ctx: Blake2b256
leaves: seq[BlobId]
path: string
file: AsyncFile
pos, nodeOffset: BiggestInt
FileStore* = ref FileStoreObj
## A store that writes nodes and leafs as files.
FileStoreObj = object of BlobStoreObj
root, buf: string
proc fsBlobClose(s: BlobStream) =
var s = FsBlobStream(s)
close s.file
proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int =
var s = FsBlobStream(s)
result = waitFor s.file.readBuffer(buffer, len)
proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
var fs = FileStore(s)
let stream = FsBlobStream()
result = stream
stream.path = fs.root / $id
stream.file = openAsync(stream.path, fmRead)
stream.closeImpl = fsBlobClose
stream.readImpl = fsBlobRead
proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
var s = FsIngestStream(s)
close s.file
s.leaves.add finish(s.ctx)
compressTree(s.leaves)
result.id = s.leaves[0]
result.size = s.pos
moveFile(s.path, s.path.parentDir / $(result.id))
proc fsIngest(s: IngestStream; buf: pointer; len: Natural) =
var
s = FsIngestStream(s)
off = 0
buf = cast[ptr array[blobLeafSize, byte]](buf)
while off < len:
var n = min(blobLeafSize, len-off)
let leafOff = int(s.pos and blobLeafSizeMask)
if leafOff == 0:
if s.pos > 0:
s.leaves.add finish(s.ctx)
s.ctx.init do (params: var Blake2bParams):
params.fanout = 2
params.depth = 255
params.leafLength = blobLeafSize
params.nodeOffset = s.nodeOffset
inc s.nodeOffset
else:
n = min(n, blobLeafSize-leafOff)
s.ctx.update(buf[off].addr, n)
waitFor s.file.writeBuffer(buf[off].addr, n)
off.inc n
s.pos.inc n
proc fsOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream =
var fs = FileStore(s)
let stream = FsIngestStream()
result = stream
stream.finishImpl = fsFinish
stream.ingestImpl = fsIngest
case kind
of dataBlob:
stream.path = fs.root / "data" / "ingest"
of metaBlob:
stream.path = fs.root / "blob" / "ingest"
stream.file = openAsync(stream.path, fmWrite)
if size > 0:
stream.file.setFileSize(size)
stream.leaves = newSeqOfCap[BlobId](leafCount size)
else:
stream.leaves = newSeq[BlobId]()
proc newFileStore*(root = "/"): FileStore =
if not existsDir(root):
createDir root
new result
result.openBlobStreamImpl = fsOpenBlobStream
result.openIngestStreamImpl = fsOpenIngestStream
result.root = root
result.buf = ""

View File

@ -1,414 +0,0 @@
import std/streams, std/strutils, std/os
import std/asyncfile, std/asyncdispatch
import cbor
import ../blobsets, ./priv/hex
import nimcrypto/blake2
type
MissingChunk* = ref object of CatchableError
cid*: Cid ## Missing chunk identifier
BufferTooSmall* = object of CatchableError
template raiseMissing*(cid: Cid) =
raise MissingChunk(msg: "chunk missing from store", cid: cid)
func leafCount(size: Natural): int = (size+blobLeafSize-1) div blobLeafSize
type
BlobStream* = ref BlobStreamObj
BlobStreamObj = object of RootObj
closeImpl*: proc (s: BlobStream) {.nimcall, gcsafe.}
readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): int {.nimcall, gcsafe.}
IngestStream* = ref IngestStreamObj
IngestStreamObj = object of RootObj
finishImpl*: proc (s: IngestStream): tuple[id: BlobId, size: BiggestInt] {.nimcall, gcsafe.}
ingestImpl*: proc (s: IngestStream; buf: pointer; size: int) {.nimcall, gcsafe.}
proc close*(s: BlobStream) =
assert(not s.closeImpl.isNil)
s.closeImpl(s)
proc read*(s: BlobStream; buf: pointer; len: Natural): int =
assert(not s.readImpl.isNil)
result = s.readImpl(s, buf, len)
proc finish*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
## Finish ingest stream
assert(not s.finishImpl.isNil)
s.finishImpl(s)
proc ingest*(s: IngestStream; buf: pointer; size: Natural) =
## Ingest stream
assert(not s.ingestImpl.isNil)
s.ingestImpl(s, buf, size)
proc ingest*(s: IngestStream; buf: var string) =
## Ingest stream
assert(not s.ingestImpl.isNil)
s.ingestImpl(s, buf[0].addr, buf.len)
type
BlobStore* = ref BlobStoreObj
BlobStoreObj* = object of RootObj
closeImpl*: proc (s: BlobStore) {.nimcall, gcsafe.}
putBufferImpl*: proc (s: BlobStore; buf: pointer; len: Natural): Cid {.nimcall, gcsafe.}
putImpl*: proc (s: BlobStore; chunk: string): Cid {.nimcall, gcsafe.}
getBufferImpl*: proc (s: BlobStore; cid: Cid; buf: pointer; len: Natural): int {.nimcall, gcsafe.}
getImpl*: proc (s: BlobStore; cid: Cid; result: var string) {.nimcall, gcsafe.}
openBlobStreamImpl*: proc (s: BlobStore; id: BlobId; size: BiggestInt): BlobStream {.nimcall, gcsafe.}
openIngestStreamImpl*: proc (s: BlobStore; size: BiggestInt): IngestStream {.nimcall, gcsafe.}
proc close*(s: BlobStore) =
## Close active store resources.
if not s.closeImpl.isNil: s.closeImpl(s)
proc putBuffer*(s: BlobStore; buf: pointer; len: Natural): Cid =
## Put a chunk into the store.
assert(0 < len and len <= maxChunkSize)
assert(not s.putBufferImpl.isNil)
s.putBufferImpl(s, buf, len)
proc put*(s: BlobStore; chunk: string): Cid =
## Place a raw block to the store. The hash argument specifies a required
## hash algorithm, or defaults to a algorithm choosen by the store
## implementation.
assert(0 < chunk.len and chunk.len <= maxChunkSize)
assert(not s.putImpl.isNil)
s.putImpl(s, chunk)
proc getBuffer*(s: BlobStore; cid: Cid; buf: pointer; len: Natural): int =
## Copy a raw block from the store into a buffer pointer.
assert(0 < len)
assert(not s.getBufferImpl.isNil)
result = s.getBufferImpl(s, cid, buf, len)
assert(0 < result)
proc get*(s: BlobStore; cid: Cid; result: var string) =
## Retrieve a raw block from the store.
assert(not s.getImpl.isNil)
s.getImpl(s, cid, result)
assert(result.len > 0)
proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt): BlobStream =
## Return a new `BlobStream` for reading a blob.
assert(not s.openBlobStreamImpl.isNil)
s.openBlobStreamImpl(s, id, size)
proc openIngestStream*(s: BlobStore; size = 0.BiggestInt): IngestStream =
## Return a new `IngestStream` for ingesting a blob.
assert(not s.openIngestStreamImpl.isNil)
s.openIngestStreamImpl(s, size)
proc get*(s: BlobStore; cid: Cid): string =
## Retrieve a raw block from the store.
result = ""
s.get(cid, result)
proc putDag*(s: BlobStore; dag: CborNode): Cid =
## Place an Dagfs node in the store.
var raw = encode dag
s.put raw
proc getDag*(s: BlobStore; cid: Cid): CborNode =
## Retrieve an CBOR DAG from the store.
let stream = newStringStream(s.get(cid))
result = parseCbor stream
close stream
type
FileStore* = ref FileStoreObj
## A store that writes nodes and leafs as files.
FileStoreObj = object of BlobStoreObj
root, buf: string
proc parentAndFile(fs: FileStore; cid: Cid): (string, string) =
## Generate the parent path and file path of CID within the store.
let digest = hex.encode(cid.data)
result[0] = fs.root / digest[0..1]
result[1] = result[0] / digest[2..digest.high]
proc fsPutBuffer(s: BlobStore; buf: pointer; len: Natural): Cid =
var fs = FileStore(s)
result = dagHash(buf, len)
if result != zeroChunk:
let (dir, path) = fs.parentAndFile(result)
if not existsDir dir:
createDir dir
if not existsFile path:
fs.buf.setLen(len)
copyMem(addr fs.buf[0], buf, fs.buf.len)
let
tmp = fs.root / "tmp"
writeFile(tmp, fs.buf)
moveFile(tmp, path)
proc fsPut(s: BlobStore; chunk: string): Cid =
var fs = FileStore(s)
result = dagHash chunk
if result != zeroChunk:
let (dir, path) = fs.parentAndFile(result)
if not existsDir dir:
createDir dir
if not existsFile path:
let
tmp = fs.root / "tmp"
writeFile(tmp, chunk)
moveFile(tmp, path)
proc fsGetBuffer(s: BlobStore; cid: Cid; buf: pointer; len: Natural): int =
var fs = FileStore(s)
let (_, path) = fs.parentAndFile cid
if existsFile path:
let fSize = path.getFileSize
if maxChunkSize < fSize:
discard tryRemoveFile path
raiseMissing cid
if len.int64 < fSize:
raise newException(BufferTooSmall, "file is $1 bytes, buffer is $2" % [$fSize, $len])
let file = open(path, fmRead)
result = file.readBuffer(buf, len)
close file
if result == 0:
raiseMissing cid
proc fsGet(s: BlobStore; cid: Cid; result: var string) =
var fs = FileStore(s)
let (_, path) = fs.parentAndFile cid
if existsFile path:
let fSize = path.getFileSize
if fSize > maxChunkSize:
discard tryRemoveFile path
raiseMissing cid
result.setLen fSize.int
let
file = open(path, fmRead)
n = file.readChars(result, 0, result.len)
close file
doAssert(n == result.len)
else:
raiseMissing cid
func compressTree(leaves: var seq[BlobId]) =
var
ctx: Blake2b256
nodeOffset = 0
nodeDepth = 0
while leaves.len > 1:
nodeOffset = 0
inc nodeDepth
var pos, next: int
while pos < leaves.len:
ctx.init do (params: var Blake2bParams):
params.fanout = 2
params.depth = 255
params.leafLength = blobLeafSize
params.nodeOffset = nodeOffset
params.nodeDepth = nodeDepth
inc nodeOffset
ctx.update(leaves[pos].data)
inc pos
if pos < leaves.len:
ctx.update(leaves[pos].data)
inc pos
leaves[next] = ctx.finish()
inc next
leaves.setLen(next)
# TODO: BLAKE2 tree finalization flags
iterator dumpBlob*(store: BlobStore; id: BlobId): string =
var
stream = store.openBlobStream(id)
buf = newString(blobLeafSize)
defer:
close stream
while true:
buf.setLen(blobLeafSize)
let n = stream.read(buf[0].addr, buf.len)
if n == 0:
break
buf.setLen(n)
yield buf
proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: BiggestInt] =
## Ingest a file and return blob metadata.
let
file = openAsync(path, fmRead)
fileSize = file.getFileSize
defer:
close file
let stream = store.openIngestStream(fileSize)
if fileSize > 0:
var buf = newString(min(blobLeafSize, fileSize))
while true:
let n = waitFor file.readBuffer(buf[0].addr, buf.len)
if n == 0: break
stream.ingest(buf[0].addr, n)
result = finish stream
proc commit*(store: BlobStore; bs: BlobSet): BlobSet =
assert(bs.kind == hotNode)
for e in bs.table.mitems:
case e.kind
of coldNode, leafNode: discard
of hotNode:
e = store.commit e
let stream = store.openIngestStream()
var buf = encode bs.toCbor
stream.ingest(buf)
let (id, _) = finish stream
result = BlobSet(kind: coldNode, setId: id)
proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet =
## Return the union of `sets`.
let fresh = newBlobSet()
result = fresh
proc freshInsert(leaf: BlobSet) = insert(fresh, leaf)
for bs in sets:
bs.apply(freshInsert)
# Store implementations
#
type
FsBlobStream = ref FsBlobStreamObj
FsBlobStreamObj = object of BlobStreamObj
path: string
file: AsyncFile
NullIngestStream = ref NullIngestStreamObj
NullIngestStreamObj = object of IngestStreamObj
ctx: Blake2b256
leaves: seq[BlobId]
pos, nodeOffset: BiggestInt
FsIngestStream = ref FsIngestStreamObj
FsIngestStreamObj = object of IngestStreamObj
ctx: Blake2b256
leaves: seq[BlobId]
path: string
file: AsyncFile
pos, nodeOffset: BiggestInt
proc nullBlobClose(s: BlobStream) = discard
proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = 0
proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt): BlobStream =
BlobStream(closeImpl: nullBlobClose, readImpl: nullBlobRead)
proc fsBlobClose(s: BlobStream) =
var s = FsBlobStream(s)
close s.file
proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int =
var s = FsBlobStream(s)
result = waitFor s.file.readBuffer(buffer, len)
proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt): BlobStream =
var fs = FileStore(s)
let stream = FsBlobStream()
result = stream
stream.path = fs.root / $id
stream.file = openAsync(stream.path, fmRead)
stream.closeImpl = fsBlobClose
stream.readImpl = fsBlobRead
proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
var s = FsIngestStream(s)
close s.file
s.leaves.add finish(s.ctx)
compressTree(s.leaves)
result.id = s.leaves[0]
result.size = s.pos
moveFile(s.path, s.path.parentDir / $(result.id))
proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
var s = NullIngestStream(s)
s.leaves.add finish(s.ctx)
compressTree(s.leaves)
result.id = s.leaves[0]
result.size = s.pos
proc nullIngest(s: IngestStream; buf: pointer; len: Natural) =
var
s = NullIngestStream(s)
off = 0
buf = cast[ptr array[blobLeafSize, byte]](buf)
while off < len:
var n = min(blobLeafSize, len-off)
let leafOff = int(s.pos and blobLeafSizeMask)
if leafOff == 0:
if s.pos > 0:
s.leaves.add finish(s.ctx)
s.ctx.init do (params: var Blake2bParams):
params.fanout = 2
params.depth = 255
params.leafLength = blobLeafSize
params.nodeOffset = s.nodeOffset
inc s.nodeOffset
else:
n = min(n, blobLeafSize-leafOff)
s.ctx.update(buf[off].addr, n)
off.inc n
s.pos.inc n
proc fsIngest(s: IngestStream; buf: pointer; len: Natural) =
var
s = FsIngestStream(s)
off = 0
buf = cast[ptr array[blobLeafSize, byte]](buf)
while off < len:
var n = min(blobLeafSize, len-off)
let leafOff = int(s.pos and blobLeafSizeMask)
if leafOff == 0:
if s.pos > 0:
s.leaves.add finish(s.ctx)
s.ctx.init do (params: var Blake2bParams):
params.fanout = 2
params.depth = 255
params.leafLength = blobLeafSize
params.nodeOffset = s.nodeOffset
inc s.nodeOffset
else:
n = min(n, blobLeafSize-leafOff)
s.ctx.update(buf[off].addr, n)
waitFor s.file.writeBuffer(buf[off].addr, n)
off.inc n
s.pos.inc n
proc nullOpenIngestStream(s: BlobStore; size: BiggestInt): IngestStream =
NullIngestStream(
finishImpl: nullFinish, ingestImpl: nullIngest, leaves: newSeq[BlobId]())
proc fsOpenIngestStream(s: BlobStore; size: BiggestInt): IngestStream =
var fs = FileStore(s)
let stream = FsIngestStream()
result = stream
stream.finishImpl = fsFinish
stream.ingestImpl = fsIngest
stream.path = fs.root / "ingest"
stream.file = openAsync(stream.path, fmWrite)
if size > 0:
stream.file.setFileSize(size)
stream.leaves = newSeqOfCap[BlobId](leafCount size)
else:
stream.leaves = newSeq[BlobId]()
proc newNullStore*(): BlobStore =
BlobStore(
openBlobStreamImpl: nullOpenBlobStream,
openIngestStreamImpl: nullOpenIngestStream)
proc newFileStore*(root: string): FileStore =
if not existsDir(root):
createDir root
new result
result.putBufferImpl = fsPutBuffer
result.putImpl = fsPut
result.getBufferImpl = fsGetBuffer
result.getImpl = fsGet
result.openBlobStreamImpl = fsOpenBlobStream
result.openIngestStreamImpl = fsOpenIngestStream
result.root = root
result.buf = ""

46
tests/test_set.nim Normal file
View File

@ -0,0 +1,46 @@
import std/unittest, std/os, std/parseopt
import ../src/blobsets
suite "Blob set tests":
var
randomCid = dagHash(newString(maxChunkSize))
# test "zero blob":
# doAssert(randomCid == zeroChunk)
proc randomize() =
randomCid = dagHash(randomCid.data.addr, randomCid.data.len)
proc testPath(s: BlobSet; root: string): BlobSet =
for path in walkDirRec(root):
randomize()
let
blob = randomCid
str = $randomCid
doAssert(str.toBlobid == randomCid)
result = insert(s, path, blob, 0)
let found = result.search(path)
doAssert(found == randomCid)
test "functional insert":
let
a = newBlobSet()
b = insert(a, "foo", randomCid, 0)
c = insert(b, "bar", randomCid, 0)
doAssert(contains(b, "foo"))
doAssert(contains(c, "foo"))
doAssert(contains(c, "bar"))
doAssert(not contains(a, "foo"))
doAssert(not contains(a, "bar"))
doAssert(not contains(b, "bar"))
test "sets":
var s = newBlobSet()
for kind, key, val in getopt():
if kind == cmdArgument:
s = s.testPath(key)
if s.isEmpty:
s = s.testPath(".")
echo s.leafCount, " leaves in ", s.nodeCount, " nodes"