From f01bf5083a5fe318def054bec687f520a2e17ddf Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Mon, 24 Dec 2018 21:19:03 +0100 Subject: [PATCH] Functional inserts --- .gitignore | 4 +- .../blobsets_genode.nimble} | 0 .../src/blobset_fs_store.nim} | 0 .../src/blobset_rom.nim} | 0 .../src/dagfs_client.h | 0 .../src/dagfs_client.nim | 0 .../src/dagfs_fs.nim | 0 .../src/dagfs_server.h | 0 .../src/dagfs_server.nim | 0 .../src/dagfs_session.nim | 0 .../src/dagfs_tcp_store.nim | 0 .../src/filesystemsession.nim | 0 .../src/fs_component.h | 0 .../src/rom_component.h | 0 src/blobset.nim | 99 +++-- src/blobsets.nim | 308 +++++++++++-- src/blobsets/filestores.nim | 117 +++++ src/blobsets/stores.nim | 414 ------------------ tests/test_set.nim | 46 ++ 19 files changed, 499 insertions(+), 489 deletions(-) rename genode/{dagfs_genode/dagfs_genode.nimble => blobsets_genode/blobsets_genode.nimble} (100%) rename genode/{dagfs_genode/src/dagfs_fs_store.nim => blobsets_genode/src/blobset_fs_store.nim} (100%) rename genode/{dagfs_genode/src/dagfs_rom.nim => blobsets_genode/src/blobset_rom.nim} (100%) rename genode/{dagfs_genode => blobsets_genode}/src/dagfs_client.h (100%) rename genode/{dagfs_genode => blobsets_genode}/src/dagfs_client.nim (100%) rename genode/{dagfs_genode => blobsets_genode}/src/dagfs_fs.nim (100%) rename genode/{dagfs_genode => blobsets_genode}/src/dagfs_server.h (100%) rename genode/{dagfs_genode => blobsets_genode}/src/dagfs_server.nim (100%) rename genode/{dagfs_genode => blobsets_genode}/src/dagfs_session.nim (100%) rename genode/{dagfs_genode => blobsets_genode}/src/dagfs_tcp_store.nim (100%) rename genode/{dagfs_genode => blobsets_genode}/src/filesystemsession.nim (100%) rename genode/{dagfs_genode => blobsets_genode}/src/fs_component.h (100%) rename genode/{dagfs_genode => blobsets_genode}/src/rom_component.h (100%) create mode 100644 src/blobsets/filestores.nim delete mode 100644 src/blobsets/stores.nim create mode 100644 tests/test_set.nim diff --git a/.gitignore b/.gitignore index 0fcee89..f36d27f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -nimcache -blobset +/blobset +/tests/test_set diff --git a/genode/dagfs_genode/dagfs_genode.nimble b/genode/blobsets_genode/blobsets_genode.nimble similarity index 100% rename from genode/dagfs_genode/dagfs_genode.nimble rename to genode/blobsets_genode/blobsets_genode.nimble diff --git a/genode/dagfs_genode/src/dagfs_fs_store.nim b/genode/blobsets_genode/src/blobset_fs_store.nim similarity index 100% rename from genode/dagfs_genode/src/dagfs_fs_store.nim rename to genode/blobsets_genode/src/blobset_fs_store.nim diff --git a/genode/dagfs_genode/src/dagfs_rom.nim b/genode/blobsets_genode/src/blobset_rom.nim similarity index 100% rename from genode/dagfs_genode/src/dagfs_rom.nim rename to genode/blobsets_genode/src/blobset_rom.nim diff --git a/genode/dagfs_genode/src/dagfs_client.h b/genode/blobsets_genode/src/dagfs_client.h similarity index 100% rename from genode/dagfs_genode/src/dagfs_client.h rename to genode/blobsets_genode/src/dagfs_client.h diff --git a/genode/dagfs_genode/src/dagfs_client.nim b/genode/blobsets_genode/src/dagfs_client.nim similarity index 100% rename from genode/dagfs_genode/src/dagfs_client.nim rename to genode/blobsets_genode/src/dagfs_client.nim diff --git a/genode/dagfs_genode/src/dagfs_fs.nim b/genode/blobsets_genode/src/dagfs_fs.nim similarity index 100% rename from genode/dagfs_genode/src/dagfs_fs.nim rename to genode/blobsets_genode/src/dagfs_fs.nim diff --git a/genode/dagfs_genode/src/dagfs_server.h b/genode/blobsets_genode/src/dagfs_server.h similarity index 100% rename from genode/dagfs_genode/src/dagfs_server.h rename to genode/blobsets_genode/src/dagfs_server.h diff --git a/genode/dagfs_genode/src/dagfs_server.nim b/genode/blobsets_genode/src/dagfs_server.nim similarity index 100% rename from genode/dagfs_genode/src/dagfs_server.nim rename to genode/blobsets_genode/src/dagfs_server.nim diff --git a/genode/dagfs_genode/src/dagfs_session.nim b/genode/blobsets_genode/src/dagfs_session.nim similarity index 100% rename from genode/dagfs_genode/src/dagfs_session.nim rename to genode/blobsets_genode/src/dagfs_session.nim diff --git a/genode/dagfs_genode/src/dagfs_tcp_store.nim b/genode/blobsets_genode/src/dagfs_tcp_store.nim similarity index 100% rename from genode/dagfs_genode/src/dagfs_tcp_store.nim rename to genode/blobsets_genode/src/dagfs_tcp_store.nim diff --git a/genode/dagfs_genode/src/filesystemsession.nim b/genode/blobsets_genode/src/filesystemsession.nim similarity index 100% rename from genode/dagfs_genode/src/filesystemsession.nim rename to genode/blobsets_genode/src/filesystemsession.nim diff --git a/genode/dagfs_genode/src/fs_component.h b/genode/blobsets_genode/src/fs_component.h similarity index 100% rename from genode/dagfs_genode/src/fs_component.h rename to genode/blobsets_genode/src/fs_component.h diff --git a/genode/dagfs_genode/src/rom_component.h b/genode/blobsets_genode/src/rom_component.h similarity index 100% rename from genode/dagfs_genode/src/rom_component.h rename to genode/blobsets_genode/src/rom_component.h diff --git a/src/blobset.nim b/src/blobset.nim index 879297f..e1e3a55 100644 --- a/src/blobset.nim +++ b/src/blobset.nim @@ -3,7 +3,7 @@ when not isMainModule: import std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin import cbor -import ./blobsets, ./blobsets/stores +import ./blobsets, ./blobsets/filestores when defined(genode): import dagfsclient @@ -26,7 +26,8 @@ proc dumpMain() = writeLine(stderr, "failed to dump '", args[i], "', ", getCurrentExceptionMsg()) quit(-1) -proc insertPath(set: BlobSet; store: BlobStore; kind: PathComponent; path: string) = +proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string): BlobSet = + result = bs try: case kind of pcFile, pcLinkToFile: @@ -34,11 +35,11 @@ proc insertPath(set: BlobSet; store: BlobStore; kind: PathComponent; path: strin let (id, size) = store.ingestFile(path) path.removePrefix(getCurrentDir()) path.removePrefix("/") - set.insert(path, id, size) + result = result.insert(path, id, size) writeLine(stdout, id, align($size, 11), " ", path) of pcDir, pcLinkToDir: for kind, subPath in path.walkDir: - set.insertPath(store, kind, subPath) + result = store.insertPath(result, kind, subPath) except: let e = getCurrentException() writeLine(stderr, "failed to ingest '", path, "', ", e.msg) @@ -54,7 +55,7 @@ proc ingestMain() = let store = newFileStore("/tmp/blobs") for i in 1..args.high: let path = normalizedPath args[i] - set.insertPath(store, path.getFileInfo.kind, path) + set = store.insertPath(set, path.getFileInfo.kind, path) let final = store.commit set writeLine(stdout, final.setId) @@ -125,8 +126,8 @@ type proc print(a: Atom; s: Stream) proc print(ast: NodeRef; s: Stream) -proc newAtom(x: tuple[id: BlobId, size: BiggestInt]): Atom = - Atom(kind: atomBlob, blob: x.id, size: x.size) +proc newAtom(id: BlobId, size: BiggestInt): Atom = + Atom(kind: atomBlob, blob: id, size: size) proc newAtom(bs: BlobSet): Atom = Atom(kind: atomSet, bs: bs) @@ -218,7 +219,7 @@ proc getSet(env: Env; path: string): BlobSet= result = env.sets.getOrDefault(path) if result.isNil: result = newBlobSet() - result.insertPath(env.store, path.getFileInfo.kind, path) + result = env.store.insertPath(result, path.getFileInfo.kind, path) if not result.isEmpty: env.sets[path] = result @@ -443,14 +444,26 @@ proc keyFunc(env: Env; args: NodeObj): NodeRef = args.atom.str.toKey.newAtom.newNode proc ingestFunc(env: Env; args: NodeObj): NodeRef = - var bs = newBlobSet() + var bs: BlobSet for n in args.walk: - bs = env.store.union(bs, env.getSet(n.atom.path)) + if bs.isNil: + bs = env.getSet(n.atom.path) + else: + bs = env.store.union(bs, env.getSet(n.atom.path)) result = bs.newAtom.newNode +proc insertFunc(env: Env; args: NodeObj): NodeRef = + assertArgCount(args, 3) + let + trie = args.atom + name = args.next.atom + blob = args.next.next.atom + newNode(newAtom(env.store.insert(trie.bs, name.str, blob.blob, blob.size))) + proc blobFunc(env: Env; args: NodeObj): NodeRef = assertArgCount(args, 1) - newNode(newAtom(env.getBlob args.atom.path)) + let (id, size) = env.getBlob args.atom.path + newNode(newAtom(id, size)) proc listFunc(env: Env; args: NodeObj): NodeRef = ## Standard Lisp 'list' function. @@ -461,21 +474,6 @@ proc listFunc(env: Env; args: NodeObj): NodeRef = while not result.tailRef.nextRef.isNil: result.tailRef = result.tailRef.nextRef -#[ -proc lsFunc(env: Env; args: NodeObj): NodeRef = - result = newNodeList() - for n in args.walk: - let - a = n.atom - ufsNode = env.getUnixfs a.cid - if ufsNode.isDir: - for name, u in ufsNode.items: - let e = newNodeList() - e.append u.cid.newAtom.newNode - e.append name.newAtomString.newNode - result.append e -]# - proc mapFunc(env: Env; args: NodeObj): NodeRef = assertArgCount(args, 2) result = newNodeList() @@ -499,23 +497,34 @@ proc mergeFunc(env: Env; args: NodeObj): NodeRef = proc pathFunc(env: Env; arg: NodeObj): NodeRef = result = arg.atom.str.newAtomPath.newNode -#[ -proc rootFunc(env: Env; args: NodeObj): NodeRef = - var root = newFsRoot() +proc removeFunc(env: Env; args: NodeObj): NodeRef = + assertArgCount(args, 2) let - name = args.atom.str - cid = args.next.atom.cid - ufs = env.getUnixfs cid - root.add(name, ufs) - let rootCid = env.store.putDag(root.toCbor) - rootCid.newAtom.newNode -]# + bs = args.atom.bs + name = args.next.atom.str + newNode(newAtom(env.store.remove(bs, name))) + +proc searchFunc(env: Env; args: NodeObj): NodeRef = + assertArgCount(args, 2) + var found: NodeRef + let + bs = args.atom.bs + name = args.next.atom.str + apply(env.store, bs, name) do (id: BlobId; size: BiggestInt): + found = newNode(newAtom(id, size)) + if found.isNil: + result = newNodeList() + else: + result = found proc unionFunc(env: Env; args: NodeObj): NodeRef = - assertArgCount(args, 2) - let bs = env.store.union(args.atom.bs, args.next.atom.bs) - bs.newAtom.newNode - + var bs: BlobSet + for n in args.walk: + if bs.isNil: + bs = n.atom.bs + else: + bs = env.store.union(bs, n.atom.bs) + result = bs.newAtom.newNode ## # Environment @@ -539,15 +548,15 @@ proc newEnv(store: BlobStore): Env = #result.bindEnv "copy", copyFunc result.bindEnv "define", defineFunc result.bindEnv "glob", globFunc - result.bindEnv "key", keyFunc result.bindEnv "ingest", ingestFunc + result.bindEnv "insert", insertFunc + result.bindEnv "key", keyFunc result.bindEnv "list", listFunc - #result.bindEnv "ls", lsFunc result.bindEnv "map", mapFunc #result.bindEnv "merge", mergeFunc result.bindEnv "path", pathFunc - #result.bindEnv "root", rootFunc - #result.bindEnv "walk", walkFunc + result.bindEnv "remove", removeFunc + result.bindEnv "search", searchFunc result.bindEnv "union", unionFunc proc eval(ast: NodeRef; env: Env): NodeRef @@ -597,8 +606,6 @@ proc eval(ast: NodeRef; env: Env): NodeRef = newNodeError(getCurrentExceptionMsg(), input) except FieldError: newNodeError("invalid argument", input) - except MissingChunk: - newNodeError("chunk not in store", input) except OSError: newNodeError(getCurrentExceptionMsg(), input) diff --git a/src/blobsets.nim b/src/blobsets.nim index a97f09e..3542d22 100644 --- a/src/blobsets.nim +++ b/src/blobsets.nim @@ -1,6 +1,7 @@ import std/hashes, std/streams, std/strutils, std/bitops, std/unicode, std/endians import base58/bitcoin, cbor, siphash import ./blobsets/priv/hex +import std/streams, std/strutils import nimcrypto, nimcrypto/blake2 @@ -209,17 +210,6 @@ func compactIndex(t: BlobSet; x: Key): int = func masked(t: BlobSet; x: Key): bool = ((t.bitmap shr x.sparseIndex) and 1) != 0 -proc apply*(bs: BlobSet; cb: proc (leaf: BlobSet)) = - ## Apply a callback to each set element. - for node in bs.table: - case node.kind - of hotNode: - apply(node, cb) - of leafNode: - cb(node) - else: - raiseAssert("cannot apply to node type " & $node.kind) - func isEmpty*(s: BlobSet): bool = s.bitmap == Key(0) ## Test if a set is empty. @@ -255,39 +245,111 @@ func search*(t: BlobSet; name: string): BlobId = else: raise newException(KeyError, "blob set does not contain key") -func insert(t, l: BlobSet; depth: int) = +func apply(bs: BlobSet; cb: proc (leaf: BlobSet)) = + ## Apply a callback to each set element. + for node in bs.table: + if node.isNil: + raiseAssert(bs.table.repr) + case node.kind + of hotNode: + apply(node, cb) + of leafNode: + cb(node) + else: + raiseAssert("cannot apply to node type " & $node.kind) + +func apply*(t: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) = + ## Apply a procedure to a named blob, if it is present + var + t = t + key = name.toKey + while true: + assert(key != 0, "keyspace exhausted during search") + if t.masked(key): + t = t.table[t.compactIndex(key)] + if t.kind == leafNode: + f(t.blob, t.size) + break + key = key shr keyChunkBits + else: + break + +func contains*(bs: BlobSet; name: string): bool = + var found = false + apply(bs, name) do (id: BlobId; size: BiggestInt): + found = true + result = found + +func insert(trie, l: BlobSet; depth: int): BlobSet = ## This procedure is recursive to a depth of keyBits/keyChunkBits. + # TODO: not functional? doAssert(depth < (keyBits div keyChunkBits), "key space exhausted during insert") + result = BlobSet(kind: hotNode, bitmap: trie.bitmap, table: trie.table) let key = l.key shr (depth * keyChunkBits) - if t.masked(key): + if result.masked(key): let depth = depth + 1 - i = t.compactIndex(key) - case t.table[i].kind + i = result.compactIndex(key) + case result.table[i].kind of hotNode: - t.table[i].insert(l, depth) + result.table[i] = insert(result.table[i], l, depth) of coldNode: raiseAssert("cannot insert into cold node") of leafNode: - if t.table[i].key == l.key: + if result.table[i].key == l.key: raise newException(KeyError, "key collision in blob set") - let - subtrei = newBlobSet() - subtrei.insert(t.table[i], depth) - subtrei.insert(l, depth) - t.table[i] = subtrei + var subtrie = newBlobSet() + subtrie = subtrie.insert(result.table[i], depth) + subtrie = subtrie.insert(l, depth) + result.table[i] = subtrie else: - t.bitmap = t.bitmap or (Key(1) shl key.sparseIndex) - t.table.insert(l, t.compactIndex(key)) + result.bitmap = result.bitmap or (Key(1) shl key.sparseIndex) + result.table.insert(l, result.compactIndex(key)) -func insert*(t, l: BlobSet) = insert(t, l, 0) - # Insert set node `t` into `l`. +func insert*(trie, node: BlobSet): BlobSet = insert(trie, node, 0) + ## Insert set node `node` into `trie`. -func insert*(t: BlobSet; name: string; blob: BlobId; size: BiggestInt) = +func insert*(t: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet = ## Insert a blob hash into a trie. + # TODO: this is not functional! let leaf = BlobSet(kind: leafNode, key: name.toKey, blob: blob, size: size) insert(t, leaf) +func remove(trie: BlobSet; key: Key; depth: int): BlobSet = + result = trie + let key = key shr (depth * keyChunkBits) + if trie.masked(key): + let + depth = depth + 1 + i = trie.compactIndex(key) + case trie.table[i].kind + of hotNode: + let newTrie = remove(trie.table[i], key, depth) + if newTrie != trie.table[i]: + if newTrie.isNil: + if trie.table.len == 1: + result = nil + else: + result = newBlobSet() + for j in trie.table.low..trie.table.high: + if j == i: continue + result = insert(result, newTrie, depth) + of coldNode: + raiseAssert("cannot remove from cold node") + of leafNode: + if trie.table.len == 1: + result = nil + +func remove*(trie: BlobSet; name: string): BlobSet = + ## Remove a blob from a trie. + if trie.isEmpty: + result = trie + else: + let key = name.toKey + result = remove(trie, key, 0) + if result.isNil: + result = newBlobSet() + func toCbor*(x: BlobSet): CborNode = const nodeTag = 0 @@ -313,3 +375,195 @@ func toCbor*(x: BlobSet): CborNode = array.add x.blob.data array.add x.size newCborTag(leafTag, array) + +func leafCount*(size: Natural): int = (size+blobLeafSize-1) div blobLeafSize + +type + BlobKind* = enum + dataBlob, metaBlob + + BlobStream* = ref BlobStreamObj + BlobStreamObj* = object of RootObj + closeImpl*: proc (s: BlobStream) {.nimcall, gcsafe.} + readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): int {.nimcall, gcsafe.} + IngestStream* = ref IngestStreamObj + IngestStreamObj* = object of RootObj + finishImpl*: proc (s: IngestStream): tuple[id: BlobId, size: BiggestInt] {.nimcall, gcsafe.} + ingestImpl*: proc (s: IngestStream; buf: pointer; size: int) {.nimcall, gcsafe.} + +proc close*(s: BlobStream) = + assert(not s.closeImpl.isNil) + s.closeImpl(s) + +proc read*(s: BlobStream; buf: pointer; len: Natural): int = + assert(not s.readImpl.isNil) + result = s.readImpl(s, buf, len) + +proc finish*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = + ## Finish ingest stream + assert(not s.finishImpl.isNil) + s.finishImpl(s) + +proc ingest*(s: IngestStream; buf: pointer; size: Natural) = + ## Ingest stream + assert(not s.ingestImpl.isNil) + s.ingestImpl(s, buf, size) + +proc ingest*(s: IngestStream; buf: var string) = + ## Ingest stream + assert(not s.ingestImpl.isNil) + s.ingestImpl(s, buf[0].addr, buf.len) + +type + BlobStore* = ref BlobStoreObj + BlobStoreObj* = object of RootObj + closeImpl*: proc (s: BlobStore) {.nimcall, gcsafe.} + openBlobStreamImpl*: proc (s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream {.nimcall, gcsafe.} + openIngestStreamImpl*: proc (s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream {.nimcall, gcsafe.} + +proc close*(s: BlobStore) = + ## Close active store resources. + if not s.closeImpl.isNil: s.closeImpl(s) + +proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt; kind = dataBlob): BlobStream = + ## Return a new `BlobStream` for reading a blob. + assert(not s.openBlobStreamImpl.isNil) + s.openBlobStreamImpl(s, id, size, kind) + +proc openIngestStream*(s: BlobStore; size = 0.BiggestInt; kind = dataBlob): IngestStream = + ## Return a new `IngestStream` for ingesting a blob. + assert(not s.openIngestStreamImpl.isNil) + s.openIngestStreamImpl(s, size, kind) + +func compressTree*(leaves: var seq[BlobId]) = + var + ctx: Blake2b256 + nodeOffset = 0 + nodeDepth = 0 + while leaves.len > 1: + nodeOffset = 0 + inc nodeDepth + var pos, next: int + while pos < leaves.len: + ctx.init do (params: var Blake2bParams): + params.fanout = 2 + params.depth = 255 + params.leafLength = blobLeafSize + params.nodeOffset = nodeOffset + params.nodeDepth = nodeDepth + inc nodeOffset + ctx.update(leaves[pos].data) + inc pos + if pos < leaves.len: + ctx.update(leaves[pos].data) + inc pos + leaves[next] = ctx.finish() + inc next + leaves.setLen(next) + # TODO: BLAKE2 tree finalization flags + +iterator dumpBlob*(store: BlobStore; id: BlobId): string = + var + stream = store.openBlobStream(id, kind=dataBlob) + buf = newString(blobLeafSize) + defer: + close stream + while true: + buf.setLen(blobLeafSize) + let n = stream.read(buf[0].addr, buf.len) + if n == 0: + break + buf.setLen(n) + yield buf + +proc commit*(store: BlobStore; bs: BlobSet): BlobSet = + assert(bs.kind == hotNode) + for e in bs.table.mitems: + case e.kind + of coldNode, leafNode: discard + of hotNode: + e = store.commit e + let stream = store.openIngestStream(kind=metaBlob) + var buf = encode bs.toCbor + stream.ingest(buf) + let (id, _) = finish stream + result = BlobSet(kind: coldNode, setId: id) + +proc apply*(store: BlobStore; bs: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) = + # TODO: lazy-load set + bs.apply(name, f) + +proc insert*(store: BlobStore; bs: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet = + # TODO: lazy-load set + insert(bs, name, blob, size) + +proc remove*(store: BlobStore; bs: BlobSet; name: string): BlobSet = + # TODO: lazy-load set + remove(bs, name) + +proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet = + ## Return the union of `sets`. + # TODO: lazy-load set + var fresh = newBlobSet() + proc freshInsert(leaf: BlobSet) = + fresh = insert(fresh, leaf) + for bs in sets: + assert(not bs.isnil) + bs.apply(freshInsert) + result = fresh + +# Store implementations +# + +type + NullIngestStream = ref NullIngestStreamObj + NullIngestStreamObj = object of IngestStreamObj + ctx: Blake2b256 + leaves: seq[BlobId] + pos, nodeOffset: BiggestInt + +proc nullBlobClose(s: BlobStream) = discard + +proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = 0 + +proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream = + BlobStream(closeImpl: nullBlobClose, readImpl: nullBlobRead) + +proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = + var s = NullIngestStream(s) + s.leaves.add finish(s.ctx) + compressTree(s.leaves) + result.id = s.leaves[0] + result.size = s.pos + +proc nullIngest(s: IngestStream; buf: pointer; len: Natural) = + var + s = NullIngestStream(s) + off = 0 + buf = cast[ptr array[blobLeafSize, byte]](buf) + while off < len: + var n = min(blobLeafSize, len-off) + let leafOff = int(s.pos and blobLeafSizeMask) + if leafOff == 0: + if s.pos > 0: + s.leaves.add finish(s.ctx) + s.ctx.init do (params: var Blake2bParams): + params.fanout = 2 + params.depth = 255 + params.leafLength = blobLeafSize + params.nodeOffset = s.nodeOffset + inc s.nodeOffset + else: + n = min(n, blobLeafSize-leafOff) + s.ctx.update(buf[off].addr, n) + off.inc n + s.pos.inc n + +proc nullOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream = + NullIngestStream( + finishImpl: nullFinish, ingestImpl: nullIngest, leaves: newSeq[BlobId]()) + +proc newNullStore*(): BlobStore = + BlobStore( + openBlobStreamImpl: nullOpenBlobStream, + openIngestStreamImpl: nullOpenIngestStream) diff --git a/src/blobsets/filestores.nim b/src/blobsets/filestores.nim new file mode 100644 index 0000000..f6675e4 --- /dev/null +++ b/src/blobsets/filestores.nim @@ -0,0 +1,117 @@ +import ../blobsets + +import std/asyncfile, std/asyncdispatch, std/os + +import nimcrypto/blake2 + +proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: BiggestInt] = + ## Ingest a file and return blob metadata. + let + file = openAsync(path, fmRead) + fileSize = file.getFileSize + defer: + close file + let stream = store.openIngestStream(fileSize, dataBlob) + if fileSize > 0: + var buf = newString(min(blobLeafSize, fileSize)) + while true: + let n = waitFor file.readBuffer(buf[0].addr, buf.len) + if n == 0: break + stream.ingest(buf[0].addr, n) + result = finish stream + +type + FsBlobStream = ref FsBlobStreamObj + FsBlobStreamObj = object of BlobStreamObj + path: string + file: AsyncFile + + FsIngestStream = ref FsIngestStreamObj + FsIngestStreamObj = object of IngestStreamObj + ctx: Blake2b256 + leaves: seq[BlobId] + path: string + file: AsyncFile + pos, nodeOffset: BiggestInt + + FileStore* = ref FileStoreObj + ## A store that writes nodes and leafs as files. + FileStoreObj = object of BlobStoreObj + root, buf: string + +proc fsBlobClose(s: BlobStream) = + var s = FsBlobStream(s) + close s.file + +proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = + var s = FsBlobStream(s) + result = waitFor s.file.readBuffer(buffer, len) + +proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream = + var fs = FileStore(s) + let stream = FsBlobStream() + result = stream + stream.path = fs.root / $id + stream.file = openAsync(stream.path, fmRead) + stream.closeImpl = fsBlobClose + stream.readImpl = fsBlobRead + +proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = + var s = FsIngestStream(s) + close s.file + s.leaves.add finish(s.ctx) + compressTree(s.leaves) + result.id = s.leaves[0] + result.size = s.pos + moveFile(s.path, s.path.parentDir / $(result.id)) + +proc fsIngest(s: IngestStream; buf: pointer; len: Natural) = + var + s = FsIngestStream(s) + off = 0 + buf = cast[ptr array[blobLeafSize, byte]](buf) + while off < len: + var n = min(blobLeafSize, len-off) + let leafOff = int(s.pos and blobLeafSizeMask) + if leafOff == 0: + if s.pos > 0: + s.leaves.add finish(s.ctx) + s.ctx.init do (params: var Blake2bParams): + params.fanout = 2 + params.depth = 255 + params.leafLength = blobLeafSize + params.nodeOffset = s.nodeOffset + inc s.nodeOffset + else: + n = min(n, blobLeafSize-leafOff) + s.ctx.update(buf[off].addr, n) + waitFor s.file.writeBuffer(buf[off].addr, n) + off.inc n + s.pos.inc n + +proc fsOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream = + var fs = FileStore(s) + let stream = FsIngestStream() + result = stream + stream.finishImpl = fsFinish + stream.ingestImpl = fsIngest + case kind + of dataBlob: + stream.path = fs.root / "data" / "ingest" + of metaBlob: + stream.path = fs.root / "blob" / "ingest" + stream.file = openAsync(stream.path, fmWrite) + if size > 0: + stream.file.setFileSize(size) + stream.leaves = newSeqOfCap[BlobId](leafCount size) + else: + stream.leaves = newSeq[BlobId]() + +proc newFileStore*(root = "/"): FileStore = + if not existsDir(root): + createDir root + new result + result.openBlobStreamImpl = fsOpenBlobStream + result.openIngestStreamImpl = fsOpenIngestStream + result.root = root + result.buf = "" diff --git a/src/blobsets/stores.nim b/src/blobsets/stores.nim deleted file mode 100644 index ac8d89c..0000000 --- a/src/blobsets/stores.nim +++ /dev/null @@ -1,414 +0,0 @@ -import std/streams, std/strutils, std/os -import std/asyncfile, std/asyncdispatch -import cbor -import ../blobsets, ./priv/hex - -import nimcrypto/blake2 - -type - MissingChunk* = ref object of CatchableError - cid*: Cid ## Missing chunk identifier - BufferTooSmall* = object of CatchableError - -template raiseMissing*(cid: Cid) = - raise MissingChunk(msg: "chunk missing from store", cid: cid) - -func leafCount(size: Natural): int = (size+blobLeafSize-1) div blobLeafSize - -type - BlobStream* = ref BlobStreamObj - BlobStreamObj = object of RootObj - closeImpl*: proc (s: BlobStream) {.nimcall, gcsafe.} - readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): int {.nimcall, gcsafe.} - IngestStream* = ref IngestStreamObj - IngestStreamObj = object of RootObj - finishImpl*: proc (s: IngestStream): tuple[id: BlobId, size: BiggestInt] {.nimcall, gcsafe.} - ingestImpl*: proc (s: IngestStream; buf: pointer; size: int) {.nimcall, gcsafe.} - -proc close*(s: BlobStream) = - assert(not s.closeImpl.isNil) - s.closeImpl(s) - -proc read*(s: BlobStream; buf: pointer; len: Natural): int = - assert(not s.readImpl.isNil) - result = s.readImpl(s, buf, len) - -proc finish*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = - ## Finish ingest stream - assert(not s.finishImpl.isNil) - s.finishImpl(s) - -proc ingest*(s: IngestStream; buf: pointer; size: Natural) = - ## Ingest stream - assert(not s.ingestImpl.isNil) - s.ingestImpl(s, buf, size) - -proc ingest*(s: IngestStream; buf: var string) = - ## Ingest stream - assert(not s.ingestImpl.isNil) - s.ingestImpl(s, buf[0].addr, buf.len) - -type - BlobStore* = ref BlobStoreObj - BlobStoreObj* = object of RootObj - closeImpl*: proc (s: BlobStore) {.nimcall, gcsafe.} - putBufferImpl*: proc (s: BlobStore; buf: pointer; len: Natural): Cid {.nimcall, gcsafe.} - putImpl*: proc (s: BlobStore; chunk: string): Cid {.nimcall, gcsafe.} - getBufferImpl*: proc (s: BlobStore; cid: Cid; buf: pointer; len: Natural): int {.nimcall, gcsafe.} - getImpl*: proc (s: BlobStore; cid: Cid; result: var string) {.nimcall, gcsafe.} - openBlobStreamImpl*: proc (s: BlobStore; id: BlobId; size: BiggestInt): BlobStream {.nimcall, gcsafe.} - openIngestStreamImpl*: proc (s: BlobStore; size: BiggestInt): IngestStream {.nimcall, gcsafe.} - -proc close*(s: BlobStore) = - ## Close active store resources. - if not s.closeImpl.isNil: s.closeImpl(s) - -proc putBuffer*(s: BlobStore; buf: pointer; len: Natural): Cid = - ## Put a chunk into the store. - assert(0 < len and len <= maxChunkSize) - assert(not s.putBufferImpl.isNil) - s.putBufferImpl(s, buf, len) - -proc put*(s: BlobStore; chunk: string): Cid = - ## Place a raw block to the store. The hash argument specifies a required - ## hash algorithm, or defaults to a algorithm choosen by the store - ## implementation. - assert(0 < chunk.len and chunk.len <= maxChunkSize) - assert(not s.putImpl.isNil) - s.putImpl(s, chunk) - -proc getBuffer*(s: BlobStore; cid: Cid; buf: pointer; len: Natural): int = - ## Copy a raw block from the store into a buffer pointer. - assert(0 < len) - assert(not s.getBufferImpl.isNil) - result = s.getBufferImpl(s, cid, buf, len) - assert(0 < result) - -proc get*(s: BlobStore; cid: Cid; result: var string) = - ## Retrieve a raw block from the store. - assert(not s.getImpl.isNil) - s.getImpl(s, cid, result) - assert(result.len > 0) - -proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt): BlobStream = - ## Return a new `BlobStream` for reading a blob. - assert(not s.openBlobStreamImpl.isNil) - s.openBlobStreamImpl(s, id, size) - -proc openIngestStream*(s: BlobStore; size = 0.BiggestInt): IngestStream = - ## Return a new `IngestStream` for ingesting a blob. - assert(not s.openIngestStreamImpl.isNil) - s.openIngestStreamImpl(s, size) - -proc get*(s: BlobStore; cid: Cid): string = - ## Retrieve a raw block from the store. - result = "" - s.get(cid, result) - -proc putDag*(s: BlobStore; dag: CborNode): Cid = - ## Place an Dagfs node in the store. - var raw = encode dag - s.put raw - -proc getDag*(s: BlobStore; cid: Cid): CborNode = - ## Retrieve an CBOR DAG from the store. - let stream = newStringStream(s.get(cid)) - result = parseCbor stream - close stream - -type - FileStore* = ref FileStoreObj - ## A store that writes nodes and leafs as files. - FileStoreObj = object of BlobStoreObj - root, buf: string - -proc parentAndFile(fs: FileStore; cid: Cid): (string, string) = - ## Generate the parent path and file path of CID within the store. - let digest = hex.encode(cid.data) - result[0] = fs.root / digest[0..1] - result[1] = result[0] / digest[2..digest.high] - -proc fsPutBuffer(s: BlobStore; buf: pointer; len: Natural): Cid = - var fs = FileStore(s) - result = dagHash(buf, len) - if result != zeroChunk: - let (dir, path) = fs.parentAndFile(result) - if not existsDir dir: - createDir dir - if not existsFile path: - fs.buf.setLen(len) - copyMem(addr fs.buf[0], buf, fs.buf.len) - let - tmp = fs.root / "tmp" - writeFile(tmp, fs.buf) - moveFile(tmp, path) - -proc fsPut(s: BlobStore; chunk: string): Cid = - var fs = FileStore(s) - result = dagHash chunk - if result != zeroChunk: - let (dir, path) = fs.parentAndFile(result) - if not existsDir dir: - createDir dir - if not existsFile path: - let - tmp = fs.root / "tmp" - writeFile(tmp, chunk) - moveFile(tmp, path) - -proc fsGetBuffer(s: BlobStore; cid: Cid; buf: pointer; len: Natural): int = - var fs = FileStore(s) - let (_, path) = fs.parentAndFile cid - if existsFile path: - let fSize = path.getFileSize - if maxChunkSize < fSize: - discard tryRemoveFile path - raiseMissing cid - if len.int64 < fSize: - raise newException(BufferTooSmall, "file is $1 bytes, buffer is $2" % [$fSize, $len]) - let file = open(path, fmRead) - result = file.readBuffer(buf, len) - close file - if result == 0: - raiseMissing cid - -proc fsGet(s: BlobStore; cid: Cid; result: var string) = - var fs = FileStore(s) - let (_, path) = fs.parentAndFile cid - if existsFile path: - let fSize = path.getFileSize - if fSize > maxChunkSize: - discard tryRemoveFile path - raiseMissing cid - result.setLen fSize.int - let - file = open(path, fmRead) - n = file.readChars(result, 0, result.len) - close file - doAssert(n == result.len) - else: - raiseMissing cid - -func compressTree(leaves: var seq[BlobId]) = - var - ctx: Blake2b256 - nodeOffset = 0 - nodeDepth = 0 - while leaves.len > 1: - nodeOffset = 0 - inc nodeDepth - var pos, next: int - while pos < leaves.len: - ctx.init do (params: var Blake2bParams): - params.fanout = 2 - params.depth = 255 - params.leafLength = blobLeafSize - params.nodeOffset = nodeOffset - params.nodeDepth = nodeDepth - inc nodeOffset - ctx.update(leaves[pos].data) - inc pos - if pos < leaves.len: - ctx.update(leaves[pos].data) - inc pos - leaves[next] = ctx.finish() - inc next - leaves.setLen(next) - # TODO: BLAKE2 tree finalization flags - -iterator dumpBlob*(store: BlobStore; id: BlobId): string = - var - stream = store.openBlobStream(id) - buf = newString(blobLeafSize) - defer: - close stream - while true: - buf.setLen(blobLeafSize) - let n = stream.read(buf[0].addr, buf.len) - if n == 0: - break - buf.setLen(n) - yield buf - -proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: BiggestInt] = - ## Ingest a file and return blob metadata. - let - file = openAsync(path, fmRead) - fileSize = file.getFileSize - defer: - close file - let stream = store.openIngestStream(fileSize) - if fileSize > 0: - var buf = newString(min(blobLeafSize, fileSize)) - while true: - let n = waitFor file.readBuffer(buf[0].addr, buf.len) - if n == 0: break - stream.ingest(buf[0].addr, n) - result = finish stream - -proc commit*(store: BlobStore; bs: BlobSet): BlobSet = - assert(bs.kind == hotNode) - for e in bs.table.mitems: - case e.kind - of coldNode, leafNode: discard - of hotNode: - e = store.commit e - let stream = store.openIngestStream() - var buf = encode bs.toCbor - stream.ingest(buf) - let (id, _) = finish stream - result = BlobSet(kind: coldNode, setId: id) - -proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet = - ## Return the union of `sets`. - let fresh = newBlobSet() - result = fresh - proc freshInsert(leaf: BlobSet) = insert(fresh, leaf) - for bs in sets: - bs.apply(freshInsert) - -# Store implementations -# - -type - FsBlobStream = ref FsBlobStreamObj - FsBlobStreamObj = object of BlobStreamObj - path: string - file: AsyncFile - - NullIngestStream = ref NullIngestStreamObj - NullIngestStreamObj = object of IngestStreamObj - ctx: Blake2b256 - leaves: seq[BlobId] - pos, nodeOffset: BiggestInt - - FsIngestStream = ref FsIngestStreamObj - FsIngestStreamObj = object of IngestStreamObj - ctx: Blake2b256 - leaves: seq[BlobId] - path: string - file: AsyncFile - pos, nodeOffset: BiggestInt - -proc nullBlobClose(s: BlobStream) = discard - -proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = 0 - -proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt): BlobStream = - BlobStream(closeImpl: nullBlobClose, readImpl: nullBlobRead) - -proc fsBlobClose(s: BlobStream) = - var s = FsBlobStream(s) - close s.file - -proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = - var s = FsBlobStream(s) - result = waitFor s.file.readBuffer(buffer, len) - -proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt): BlobStream = - var fs = FileStore(s) - let stream = FsBlobStream() - result = stream - stream.path = fs.root / $id - stream.file = openAsync(stream.path, fmRead) - stream.closeImpl = fsBlobClose - stream.readImpl = fsBlobRead - -proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = - var s = FsIngestStream(s) - close s.file - s.leaves.add finish(s.ctx) - compressTree(s.leaves) - result.id = s.leaves[0] - result.size = s.pos - moveFile(s.path, s.path.parentDir / $(result.id)) - -proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = - var s = NullIngestStream(s) - s.leaves.add finish(s.ctx) - compressTree(s.leaves) - result.id = s.leaves[0] - result.size = s.pos - -proc nullIngest(s: IngestStream; buf: pointer; len: Natural) = - var - s = NullIngestStream(s) - off = 0 - buf = cast[ptr array[blobLeafSize, byte]](buf) - while off < len: - var n = min(blobLeafSize, len-off) - let leafOff = int(s.pos and blobLeafSizeMask) - if leafOff == 0: - if s.pos > 0: - s.leaves.add finish(s.ctx) - s.ctx.init do (params: var Blake2bParams): - params.fanout = 2 - params.depth = 255 - params.leafLength = blobLeafSize - params.nodeOffset = s.nodeOffset - inc s.nodeOffset - else: - n = min(n, blobLeafSize-leafOff) - s.ctx.update(buf[off].addr, n) - off.inc n - s.pos.inc n - -proc fsIngest(s: IngestStream; buf: pointer; len: Natural) = - var - s = FsIngestStream(s) - off = 0 - buf = cast[ptr array[blobLeafSize, byte]](buf) - while off < len: - var n = min(blobLeafSize, len-off) - let leafOff = int(s.pos and blobLeafSizeMask) - if leafOff == 0: - if s.pos > 0: - s.leaves.add finish(s.ctx) - s.ctx.init do (params: var Blake2bParams): - params.fanout = 2 - params.depth = 255 - params.leafLength = blobLeafSize - params.nodeOffset = s.nodeOffset - inc s.nodeOffset - else: - n = min(n, blobLeafSize-leafOff) - s.ctx.update(buf[off].addr, n) - waitFor s.file.writeBuffer(buf[off].addr, n) - off.inc n - s.pos.inc n - -proc nullOpenIngestStream(s: BlobStore; size: BiggestInt): IngestStream = - NullIngestStream( - finishImpl: nullFinish, ingestImpl: nullIngest, leaves: newSeq[BlobId]()) - -proc fsOpenIngestStream(s: BlobStore; size: BiggestInt): IngestStream = - var fs = FileStore(s) - let stream = FsIngestStream() - result = stream - stream.finishImpl = fsFinish - stream.ingestImpl = fsIngest - stream.path = fs.root / "ingest" - stream.file = openAsync(stream.path, fmWrite) - if size > 0: - stream.file.setFileSize(size) - stream.leaves = newSeqOfCap[BlobId](leafCount size) - else: - stream.leaves = newSeq[BlobId]() - -proc newNullStore*(): BlobStore = - BlobStore( - openBlobStreamImpl: nullOpenBlobStream, - openIngestStreamImpl: nullOpenIngestStream) - -proc newFileStore*(root: string): FileStore = - if not existsDir(root): - createDir root - new result - result.putBufferImpl = fsPutBuffer - result.putImpl = fsPut - result.getBufferImpl = fsGetBuffer - result.getImpl = fsGet - result.openBlobStreamImpl = fsOpenBlobStream - result.openIngestStreamImpl = fsOpenIngestStream - result.root = root - result.buf = "" diff --git a/tests/test_set.nim b/tests/test_set.nim new file mode 100644 index 0000000..3ad150f --- /dev/null +++ b/tests/test_set.nim @@ -0,0 +1,46 @@ +import std/unittest, std/os, std/parseopt + +import ../src/blobsets + +suite "Blob set tests": + + var + randomCid = dagHash(newString(maxChunkSize)) + + # test "zero blob": + # doAssert(randomCid == zeroChunk) + + proc randomize() = + randomCid = dagHash(randomCid.data.addr, randomCid.data.len) + + proc testPath(s: BlobSet; root: string): BlobSet = + for path in walkDirRec(root): + randomize() + let + blob = randomCid + str = $randomCid + doAssert(str.toBlobid == randomCid) + result = insert(s, path, blob, 0) + let found = result.search(path) + doAssert(found == randomCid) + + test "functional insert": + let + a = newBlobSet() + b = insert(a, "foo", randomCid, 0) + c = insert(b, "bar", randomCid, 0) + doAssert(contains(b, "foo")) + doAssert(contains(c, "foo")) + doAssert(contains(c, "bar")) + doAssert(not contains(a, "foo")) + doAssert(not contains(a, "bar")) + doAssert(not contains(b, "bar")) + + test "sets": + var s = newBlobSet() + for kind, key, val in getopt(): + if kind == cmdArgument: + s = s.testPath(key) + if s.isEmpty: + s = s.testPath(".") + echo s.leafCount, " leaves in ", s.nodeCount, " nodes"