import std/asyncdispatch import std/hashes, std/streams, std/strutils, std/bitops, std/unicode, std/endians import cbor, siphash import ./blobsets/priv/hex import std/streams, std/strutils import nimcrypto, nimcrypto/blake2 const digestLen* = 32 ## Length of a chunk digest. cidSize* = digestLen ## Size of CID object in memory blobLeafSize* = 1 shl 14 ## Size of blob leaves. blobLeafSizeMask* = not(not(0) shl 14) blobHexLen* = 32 * 2 blobVisualLen* = 32 * 3 maxChunkSize* {.deprecated} = blobLeafSize type Blake2b256* = Blake2bContext[256] BlobId* = MDigest[Blake2b256.bits] ## Blob Identifier SetId* = MDigest[Blake2b256.bits] ## Set Identifier Cid* {.deprecated} = BlobId func `$`*(bh: BlobId): string = ## Convert a blob hash to a visual representation. const baseRune = 0x2800 result = newString(blobVisualLen) var pos = 0 for b in bh.data.items: let r = (Rune)baseRune or b.int fastToUTF8Copy(r, result, pos, true) func parseStringId[T](s: string): T = case s.len of blobHexLen: hex.decode s, result.data of blobVisualLen: var pos: int r: Rune for b in result.data.mitems: fastRuneAt(s, pos, r, true) b = r.byte else: raise newException(ValueError, "invalid blobset id encoding") func parseCborId[T](c: CborNode): T = ## Parse a CBOR node to binary. if c.bytes.len == result.data.len: for i in 0..result.data.high: result.data[i] = c.bytes[i] func toBlobId*(s: string): BlobId = ## Parse a visual blob hash to binary. parseStringId[BlobId] s func toBlobId(c: CborNode): BlobId = ## Parse a CBOR blob hash to binary. parseCborId[BlobId] c func toSetId*(s: string): SetId = ## Parse a visual set hash to binary. parseStringId[SetId] s func toSetId(c: CborNode): SetId = ## Parse a CBOR set hash to binary. parseCborId[SetId] c proc `==`*(x, y: BlobId): bool = x.data == y.data ## Compare two BlobIds. proc `==`*(cbor: CborNode; cid: BlobId): bool = ## Compare a CBOR node with a BlobId. if cbor.kind == cborBytes: for i in 0.. 1: nodeOffset = 0 inc nodeDepth var pos, next: int while pos < leaves.len: ctx.init do (params: var Blake2bParams): params.fanout = 2 params.depth = 255 params.leafLength = blobLeafSize params.nodeOffset = nodeOffset params.nodeDepth = nodeDepth inc nodeOffset ctx.update(leaves[pos].data) inc pos if pos < leaves.len: ctx.update(leaves[pos].data) inc pos leaves[next] = ctx.finish() inc next leaves.setLen(next) # TODO: BLAKE2 tree finalization flags type BlobKind* = enum dataBlob, metaBlob proc `$`*(k: BlobKind): string = case k of dataBlob: "data" of metaBlob: "meta" type BlobStream* = ref BlobStreamObj BlobStreamObj* = object of RootObj closeImpl*: proc (s: BlobStream) {.nimcall, gcsafe.} sizeImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.} setPosImpl*: proc (s: BlobStream; pos: BiggestInt) {.nimcall, gcsafe.} getPosImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.} readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): Future[int] {.nimcall, gcsafe.} IngestStream* = ref IngestStreamObj IngestStreamObj* = object of RootObj cancelImpl*: proc (s: IngestStream) {.nimcall, gcsafe.} finishImpl*: proc (s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] {.nimcall, gcsafe.} ingestImpl*: proc (s: IngestStream; buf: pointer; size: int): Future[void] {.nimcall, gcsafe.} proc close*(s: BlobStream) = assert(not s.closeImpl.isNil) s.closeImpl(s) proc size*(s: BlobStream): BiggestInt = assert(not s.sizeImpl.isNil) s.sizeImpl(s) proc `pos=`*(s: BlobStream; pos: BiggestInt) = assert(not s.setPosImpl.isNil) s.setPosImpl(s, pos) proc pos*(s: BlobStream): BiggestInt = assert(not s.getPosImpl.isNil) s.getPosImpl(s) proc read*(s: BlobStream; buf: pointer; len: Natural): Future[int] = assert(not s.readImpl.isNil) s.readImpl(s, buf, len) proc cancle*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = ## Cancel and close ingest stream assert(not s.cancelImpl.isNil) s.cancelImpl(s) proc finish*(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] = ## Finish ingest stream assert(not s.finishImpl.isNil) s.finishImpl(s) proc ingest*(s: IngestStream; buf: pointer; size: Natural): Future[void] = ## Ingest stream assert(not s.ingestImpl.isNil) s.ingestImpl(s, buf, size) proc ingest*(s: IngestStream; buf: string): Future[void] = ## Ingest stream assert(not s.ingestImpl.isNil) s.ingestImpl(s, buf[0].unsafeAddr, buf.len) type BlobStore* = ref BlobStoreObj BlobStoreObj* = object of RootObj closeImpl*: proc (s: BlobStore) {.nimcall, gcsafe.} openBlobStreamImpl*: proc (s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream {.nimcall, gcsafe.} openIngestStreamImpl*: proc (s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream {.nimcall, gcsafe.} # # Null Store implementation # type NullIngestStream = ref NullIngestStreamObj NullIngestStreamObj = object of IngestStreamObj ctx: Blake2b256 leaves: seq[BlobId] pos, nodeOffset: BiggestInt proc nullBlobClose(s: BlobStream) = discard proc setPosNull(s: BlobStream; pos: BiggestInt) = discard proc getPosNull(s: BlobStream): BiggestInt = discard proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): Future[int] = result = newFuture[int]() complete result, 0 proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream = BlobStream( closeImpl: nullBlobClose, setPosImpl: setPosNull, getPosImpl: getPosNull, readImpl: nullBlobRead) proc nullFinish(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] = var s = NullIngestStream(s) s.leaves.add finish(s.ctx) compressTree(s.leaves) var pair: tuple[id: BlobId, size: BiggestInt] pair.id = s.leaves[0] pair.size = s.pos result = newFuture[tuple[id: BlobId, size: BiggestInt]]() complete result, pair proc nullIngest(s: IngestStream; buf: pointer; len: Natural): Future[void] = var s = NullIngestStream(s) off = 0 buf = cast[ptr array[blobLeafSize, byte]](buf) while off < len: var n = min(blobLeafSize, len-off) let leafOff = int(s.pos and blobLeafSizeMask) if leafOff == 0: if s.pos > 0: s.leaves.add finish(s.ctx) s.ctx.init do (params: var Blake2bParams): params.fanout = 2 params.depth = 255 params.leafLength = blobLeafSize params.nodeOffset = s.nodeOffset inc s.nodeOffset else: n = min(n, blobLeafSize-leafOff) s.ctx.update(buf[off].addr, n) off.inc n s.pos.inc n result = newFuture[void]() complete result proc nullOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream = NullIngestStream( finishImpl: nullFinish, ingestImpl: nullIngest, leaves: newSeq[BlobId]()) proc newNullStore*(): BlobStore = BlobStore( openBlobStreamImpl: nullOpenBlobStream, openIngestStreamImpl: nullOpenIngestStream) proc close*(s: BlobStore) = ## Close active store resources. if not s.closeImpl.isNil: s.closeImpl(s) proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt; kind = dataBlob): BlobStream = ## Return a new `BlobStream` for reading a blob. assert(not s.openBlobStreamImpl.isNil) s.openBlobStreamImpl(s, id, size, kind) proc openIngestStream*(s: BlobStore; size = 0.BiggestInt; kind = dataBlob): IngestStream = ## Return a new `IngestStream` for ingesting a blob. assert(not s.openIngestStreamImpl.isNil) s.openIngestStreamImpl(s, size, kind) iterator dumpBlob*(store: BlobStore; id: BlobId): string = var stream = store.openBlobStream(id, kind=dataBlob) buf = newString(blobLeafSize) defer: close stream while true: buf.setLen(blobLeafSize) let n = waitFor stream.read(buf[0].addr, buf.len) if n == 0: break buf.setLen(n) yield buf proc loadSet(store: BlobStore; id: SetId; depth: int): Future[BlobSet] {.async.} = if (not Key(0)) shr depth == 0: raiseAssert("loadSet trie is too deep") var stream = store.openBlobStream(id, kind=metaBlob) buf = newString(blobLeafSize) defer: close stream let n = await stream.read(buf[0].addr, buf.len) assert(n != 0, "read zero of set " & $id) buf.setLen(n) let tagPair = parseCbor buf c = tagPair.val bitmap = c.seq[0].getInt.uint64 if bitmap.countSetBits != c.seq.len-1: let bits = bitmap.countSetBits raise newException(ValueError, "invalid set CBOR, bitmap has " & $bits & " bits and sequence len is " & $c.seq.len) result = BlobSet( kind: hotNode, bitmap: bitmap, table: newSeqOfCap[BlobSet](c.seq.len-1)) for i in 1..c.seq.high: let node = c[i].val case c[i].tag.int of nodeTag: let child = await loadSet(store, node.toSetId, depth+1) result.table.add child of leafTag: let leaf = BlobSet( kind: leafNode, key: getNum[Key] node[0], blob: parseCborId[BlobId] node[1], size: getInt node[2]) result.table.add leaf else: raise newException(ValueError, "invalid set CBOR") proc load*(store: BlobStore; id: SetId): BlobSet = waitFor loadSet(store, id, 0) proc commit*(store: BlobStore; bs: BlobSet): BlobSet = if bs.kind == coldNode: return bs let tmp = BlobSet(kind: hotNode, bitmap: bs.bitmap, table: bs.table) for e in tmp.table.mitems: if e.isHot: e = store.commit e let stream = store.openIngestStream(kind=metaBlob) var buf = encode tmp.toCbor waitFor stream.ingest(buf) let (id, _) = waitFor finish(stream) result = BlobSet(kind: coldNode, setId: id) proc apply*(store: BlobStore; bs: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) = # TODO: lazy-load set doAssert(bs.kind == hotNode) apply(bs, name, f) proc insert*(store: BlobStore; bs: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet = # TODO: lazy-load set insert(bs, name, blob, size) proc remove*(store: BlobStore; bs: BlobSet; name: string): BlobSet = # TODO: lazy-load set remove(bs, name) proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet = ## Return the union of `sets`. # TODO: lazy-load set var fresh = newBlobSet() proc freshInsert(leaf: BlobSet) = fresh = insert(fresh, leaf) for bs in sets: assert(not bs.isnil) bs.apply(freshInsert) result = fresh import random proc randomApply*(store: BlobStore; trie: BlobSet; seed: int64; f: proc(id: BlobId; size: BiggestInt)) = ## Apply to random leaf if the set is not empty. var rng = initRand(seed) retry = 0 trie = trie i = rng.rand(max(1, countSetBits(trie.bitmap))-1) while trie.bitmap != 0: let next = trie.table[i] case next.kind of leafNode: f(next.blob, next.size) break of coldNode: trie.table[i] = store.load(next.setId) of hotNode: trie = next i = rng.rand(countSetBits(trie.bitmap)-1)