From c5cc2f35a50264e7ae006e606273322ff3cd3af7 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Thu, 27 Dec 2018 01:32:59 +0100 Subject: [PATCH] Load and commit --- src/blobset.nim | 22 ++- src/blobsets.nim | 279 ++++++++++++++++++++++-------------- src/blobsets/filestores.nim | 10 +- 3 files changed, 191 insertions(+), 120 deletions(-) diff --git a/src/blobset.nim b/src/blobset.nim index e1e3a55..663b7f7 100644 --- a/src/blobset.nim +++ b/src/blobset.nim @@ -5,8 +5,8 @@ import std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std import cbor import ./blobsets, ./blobsets/filestores -when defined(genode): - import dagfsclient +#when defined(genode): +# import dagfsclient #else: # import ./blobsets/tcp @@ -43,7 +43,6 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string except: let e = getCurrentException() writeLine(stderr, "failed to ingest '", path, "', ", e.msg) - # raise e proc ingestMain() = var args = newSeq[string]() @@ -252,7 +251,11 @@ proc print(a: Atom; s: Stream) = s.write '|' s.write $a.size of atomSet: - s.write "«set»" + case a.bs.kind + of coldNode: + s.write $a.bs.setId + else: + s.write "«set»" of atomString: s.write '"' s.write a.str @@ -390,7 +393,8 @@ proc cborFunc(env: Env; arg: NodeObj): NodeRef = proc commitFunc(env: Env; arg: NodeObj): NodeRef = assertArgCount(arg, 1) - raiseAssert("not implemented") + let cold = commit(env.store, arg.atom.bs) + cold.newAtom.newNode #[ proc copyFunc(env: Env; args: NodeObj): NodeRef = @@ -474,6 +478,11 @@ proc listFunc(env: Env; args: NodeObj): NodeRef = while not result.tailRef.nextRef.isNil: result.tailRef = result.tailRef.nextRef +proc loadFunc(env: Env; args: NodeObj): NodeRef = + assertArgCount(args, 1) + let bs = env.store.loadSet args.atom.bs.setId + bs.newAtom.newNode + proc mapFunc(env: Env; args: NodeObj): NodeRef = assertArgCount(args, 2) result = newNodeList() @@ -551,6 +560,7 @@ proc newEnv(store: BlobStore): Env = result.bindEnv "ingest", ingestFunc result.bindEnv "insert", insertFunc result.bindEnv "key", keyFunc + result.bindEnv "load", loadFunc result.bindEnv "list", listFunc result.bindEnv "map", mapFunc #result.bindEnv "merge", mergeFunc @@ -638,7 +648,7 @@ proc replMain() = scripted = true let #store = openStore() - store = newNullStore() # newFileStore("/tmp/blobs") + store = newFileStore("/tmp/blobs") env = newEnv(store) outStream = stdout.newFileStream readLine = if scripted: readLineSimple else: readLineFromStdin diff --git a/src/blobsets.nim b/src/blobsets.nim index 3542d22..b375ad3 100644 --- a/src/blobsets.nim +++ b/src/blobsets.nim @@ -36,8 +36,7 @@ func `$`*(bh: BlobId): string = let r = (Rune)baseRune or b.int fastToUTF8Copy(r, result, pos, true) -func toBlobId*(s: string): BlobId = - ## Parse a visual blob hash to binary. +func parseStringId[T](s: string): T = if s.len == blobVisualLen: var pos: int @@ -46,6 +45,28 @@ func toBlobId*(s: string): BlobId = fastRuneAt(s, pos, r, true) b = r.byte +func parseCborId[T](c: CborNode): T = + ## Parse a CBOR node to binary. + if c.bytes.len == result.data.len: + for i in 0..result.data.high: + result.data[i] = c.bytes[i] + +func toBlobId*(s: string): BlobId = + ## Parse a visual blob hash to binary. + parseStringId[BlobId] s + +func toBlobId(c: CborNode): BlobId = + ## Parse a CBOR blob hash to binary. + parseCborId[BlobId] c + +func toSetId*(s: string): SetId = + ## Parse a visual set hash to binary. + parseStringId[SetId] s + +func toSetId(c: CborNode): SetId = + ## Parse a CBOR set hash to binary. + parseCborId[SetId] c + proc `==`*(x, y: BlobId): bool = x.data == y.data ## Compare two BlobIds. @@ -182,14 +203,19 @@ func toCbor(k: Key): CborNode = ## Keys are endian independent. newCborBytes cast[array[sizeof(k), byte]](k) +const + # CBOR tags + nodeTag = 0 + leafTag = 1 + type - setKind* = enum hotNode, coldNode, leafNode + SetKind* = enum hotNode, coldNode, leafNode BlobSet* = ref BlobSetObj BlobSetObj = object - case kind*: setKind + case kind*: SetKind of hotNode: bitmap: Key - table*: seq[BlobSet] + table: seq[BlobSet] of coldNode: setId*: SetId of leafNode: @@ -351,33 +377,51 @@ func remove*(trie: BlobSet; name: string): BlobSet = result = newBlobSet() func toCbor*(x: BlobSet): CborNode = - const - nodeTag = 0 - leafTag = 1 - let array = newCborArray() case x.kind of hotNode: - var - map = x.bitmap - buf = newCborBytes(sizeof(Key)) - when not sizeof(Key) == 8: - {.error: "unknown key conversion".} - bigEndian64(buf.bytes[0].addr, map.addr) - array.add buf + let array = newCborArray() + array.add x.bitmap for y in x.table: array.add y.toCbor newCborTag(nodeTag, array) of coldNode: - array.add x.setId.data - newCborTag(nodeTag, array) + newCborTag(nodeTag, x.setId.data.newCborBytes) of leafNode: - array.add x.key.toCbor + let array = newCborArray() + array.add x.key array.add x.blob.data array.add x.size newCborTag(leafTag, array) func leafCount*(size: Natural): int = (size+blobLeafSize-1) div blobLeafSize +func compressTree*(leaves: var seq[BlobId]) = + var + ctx: Blake2b256 + nodeOffset = 0 + nodeDepth = 0 + while leaves.len > 1: + nodeOffset = 0 + inc nodeDepth + var pos, next: int + while pos < leaves.len: + ctx.init do (params: var Blake2bParams): + params.fanout = 2 + params.depth = 255 + params.leafLength = blobLeafSize + params.nodeOffset = nodeOffset + params.nodeDepth = nodeDepth + inc nodeOffset + ctx.update(leaves[pos].data) + inc pos + if pos < leaves.len: + ctx.update(leaves[pos].data) + inc pos + leaves[next] = ctx.finish() + inc next + leaves.setLen(next) + # TODO: BLAKE2 tree finalization flags + type BlobKind* = enum dataBlob, metaBlob @@ -421,98 +465,8 @@ type openBlobStreamImpl*: proc (s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream {.nimcall, gcsafe.} openIngestStreamImpl*: proc (s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream {.nimcall, gcsafe.} -proc close*(s: BlobStore) = - ## Close active store resources. - if not s.closeImpl.isNil: s.closeImpl(s) - -proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt; kind = dataBlob): BlobStream = - ## Return a new `BlobStream` for reading a blob. - assert(not s.openBlobStreamImpl.isNil) - s.openBlobStreamImpl(s, id, size, kind) - -proc openIngestStream*(s: BlobStore; size = 0.BiggestInt; kind = dataBlob): IngestStream = - ## Return a new `IngestStream` for ingesting a blob. - assert(not s.openIngestStreamImpl.isNil) - s.openIngestStreamImpl(s, size, kind) - -func compressTree*(leaves: var seq[BlobId]) = - var - ctx: Blake2b256 - nodeOffset = 0 - nodeDepth = 0 - while leaves.len > 1: - nodeOffset = 0 - inc nodeDepth - var pos, next: int - while pos < leaves.len: - ctx.init do (params: var Blake2bParams): - params.fanout = 2 - params.depth = 255 - params.leafLength = blobLeafSize - params.nodeOffset = nodeOffset - params.nodeDepth = nodeDepth - inc nodeOffset - ctx.update(leaves[pos].data) - inc pos - if pos < leaves.len: - ctx.update(leaves[pos].data) - inc pos - leaves[next] = ctx.finish() - inc next - leaves.setLen(next) - # TODO: BLAKE2 tree finalization flags - -iterator dumpBlob*(store: BlobStore; id: BlobId): string = - var - stream = store.openBlobStream(id, kind=dataBlob) - buf = newString(blobLeafSize) - defer: - close stream - while true: - buf.setLen(blobLeafSize) - let n = stream.read(buf[0].addr, buf.len) - if n == 0: - break - buf.setLen(n) - yield buf - -proc commit*(store: BlobStore; bs: BlobSet): BlobSet = - assert(bs.kind == hotNode) - for e in bs.table.mitems: - case e.kind - of coldNode, leafNode: discard - of hotNode: - e = store.commit e - let stream = store.openIngestStream(kind=metaBlob) - var buf = encode bs.toCbor - stream.ingest(buf) - let (id, _) = finish stream - result = BlobSet(kind: coldNode, setId: id) - -proc apply*(store: BlobStore; bs: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) = - # TODO: lazy-load set - bs.apply(name, f) - -proc insert*(store: BlobStore; bs: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet = - # TODO: lazy-load set - insert(bs, name, blob, size) - -proc remove*(store: BlobStore; bs: BlobSet; name: string): BlobSet = - # TODO: lazy-load set - remove(bs, name) - -proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet = - ## Return the union of `sets`. - # TODO: lazy-load set - var fresh = newBlobSet() - proc freshInsert(leaf: BlobSet) = - fresh = insert(fresh, leaf) - for bs in sets: - assert(not bs.isnil) - bs.apply(freshInsert) - result = fresh - -# Store implementations +# +# Null Store implementation # type @@ -567,3 +521,106 @@ proc newNullStore*(): BlobStore = BlobStore( openBlobStreamImpl: nullOpenBlobStream, openIngestStreamImpl: nullOpenIngestStream) + +proc close*(s: BlobStore) = + ## Close active store resources. + if not s.closeImpl.isNil: s.closeImpl(s) + +proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt; kind = dataBlob): BlobStream = + ## Return a new `BlobStream` for reading a blob. + assert(not s.openBlobStreamImpl.isNil) + s.openBlobStreamImpl(s, id, size, kind) + +proc openIngestStream*(s: BlobStore; size = 0.BiggestInt; kind = dataBlob): IngestStream = + ## Return a new `IngestStream` for ingesting a blob. + assert(not s.openIngestStreamImpl.isNil) + s.openIngestStreamImpl(s, size, kind) + +iterator dumpBlob*(store: BlobStore; id: BlobId): string = + var + stream = store.openBlobStream(id, kind=dataBlob) + buf = newString(blobLeafSize) + defer: + close stream + while true: + buf.setLen(blobLeafSize) + let n = stream.read(buf[0].addr, buf.len) + if n == 0: + break + buf.setLen(n) + yield buf + +proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet = + if Key.high shr depth == 0: + raiseAssert("loadSet trie is too deep") + var + stream = store.openBlobStream(id, kind=metaBlob) + buf = newString(blobLeafSize) + defer: + close stream + let n = stream.read(buf[0].addr, buf.len) + buf.setLen(n) + let + c = buf.parseCbor.val + bitmap = c.seq[0].getInt + if bitmap.countSetBits != c.seq.len-1: + let bits = bitmap.countSetBits + raise newException(ValueError, "invalid set CBOR, bitmap is " & $bits & " and sequence len is " & $c.seq.len) + result = BlobSet( + kind: hotNode, + bitmap: bitmap, + table: newSeqOfCap[BlobSet](c.seq.len-1)) + for i in 1..c.seq.high: + let node = c[i].val + case c[i].tag.int + of nodeTag: + result.table.add loadSet(store, node.toSetId, depth+1) + of leafTag: + let + leaf = BlobSet( + kind: leafNode, + key: getInt node[0], + blob: parseCborId[BlobId] node[1], + size: getInt node[2]) + result.table.add leaf + else: + raise newException(ValueError, "invalid set CBOR") + +proc loadSet*(store: BlobStore; id: SetId): BlobSet = + loadSet store, id, 0 + +proc commit*(store: BlobStore; bs: BlobSet): BlobSet = + assert(bs.kind == hotNode) + for e in bs.table.mitems: + case e.kind + of coldNode, leafNode: discard + of hotNode: + e = store.commit e + let stream = store.openIngestStream(kind=metaBlob) + var buf = encode bs.toCbor + stream.ingest(buf) + let (id, _) = finish stream + result = BlobSet(kind: coldNode, setId: id) + +proc apply*(store: BlobStore; bs: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) = + # TODO: lazy-load set + bs.apply(name, f) + +proc insert*(store: BlobStore; bs: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet = + # TODO: lazy-load set + insert(bs, name, blob, size) + +proc remove*(store: BlobStore; bs: BlobSet; name: string): BlobSet = + # TODO: lazy-load set + remove(bs, name) + +proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet = + ## Return the union of `sets`. + # TODO: lazy-load set + var fresh = newBlobSet() + proc freshInsert(leaf: BlobSet) = + fresh = insert(fresh, leaf) + for bs in sets: + assert(not bs.isnil) + bs.apply(freshInsert) + result = fresh diff --git a/src/blobsets/filestores.nim b/src/blobsets/filestores.nim index f6675e4..18b8682 100644 --- a/src/blobsets/filestores.nim +++ b/src/blobsets/filestores.nim @@ -51,7 +51,11 @@ proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind var fs = FileStore(s) let stream = FsBlobStream() result = stream - stream.path = fs.root / $id + case kind + of dataBlob: + stream.path = fs.root / "data" / $id + of metaBlob: + stream.path = fs.root / "blob" / $id stream.file = openAsync(stream.path, fmRead) stream.closeImpl = fsBlobClose stream.readImpl = fsBlobRead @@ -108,8 +112,8 @@ proc fsOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestS stream.leaves = newSeq[BlobId]() proc newFileStore*(root = "/"): FileStore = - if not existsDir(root): - createDir root + createDir(root / "data") + createDir(root / "blob") new result result.openBlobStreamImpl = fsOpenBlobStream result.openIngestStreamImpl = fsOpenIngestStream