import std/asyncdispatch, std/asyncstreams import std/hashes, std/streams, std/strutils, std/bitops, std/unicode, std/endians import std/streams, std/strutils, std/random import cbor, siphash import ./blobsets/priv/hex, ./blobsets/priv/nimcrypto, ./blobsets/priv/nimcrypto/blake2 const digestLen* = 32 ## Length of a chunk digest. cidSize* = digestLen ## Size of CID object in memory blobLeafSize* = 1 shl 14 ## Size of blob leaves. blobLeafSizeMask* = not(not(0) shl 14) blobHexLen* = 32 * 2 blobVisualLen* = 32 * 3 type Blake2b256* = Blake2bContext[256] BlobId* = MDigest[Blake2b256.bits] ## Blob Identifier SetId* = MDigest[Blake2b256.bits] ## Set Identifier Cid* {.deprecated} = BlobId func `$`*(bh: BlobId): string = ## Convert a blob hash to a visual representation. const baseRune = 0x2800 result = newString(blobVisualLen) var pos = 0 for b in bh.data.items: let r = (Rune)baseRune or b.int fastToUTF8Copy(r, result, pos, true) func parseStringId[T](s: string): T = case s.len of blobHexLen: hex.decode s, result.data of blobVisualLen: var pos: int r: Rune for b in result.data.mitems: fastRuneAt(s, pos, r, true) b = r.byte else: raise newException(ValueError, "invalid blobset id encoding") func parseCborId[T](c: CborNode): T = ## Parse a CBOR node to binary. if c.bytes.len == result.data.len: for i in 0..result.data.high: result.data[i] = c.bytes[i] func toBlobId*(s: string): BlobId = ## Parse a visual blob hash to binary. parseStringId[BlobId] s func toBlobId(c: CborNode): BlobId = ## Parse a CBOR blob hash to binary. parseCborId[BlobId] c func toSetId*(s: string): SetId = ## Parse a visual set hash to binary. parseStringId[SetId] s func toSetId(c: CborNode): SetId = ## Parse a CBOR set hash to binary. parseCborId[SetId] c proc `==`*(x, y: BlobId): bool = x.data == y.data ## Compare two BlobIds. proc `==`*(cbor: CborNode; cid: BlobId): bool = ## Compare a CBOR node with a BlobId. if cbor.kind == cborBytes: for i in 0.. 1: nodeOffset = 0 inc nodeDepth var pos, next: int while pos < len: ctx.init do (params: var Blake2bParams): params.fanout = 2 params.depth = 255 params.leafLength = blobLeafSize params.nodeOffset = nodeOffset params.nodeDepth = nodeDepth inc nodeOffset ctx.update(leaves[pos].data) inc pos if pos < len: ctx.update(leaves[pos].data) inc pos leaves[next] = ctx.finish() inc next len = next # TODO: BLAKE2 tree finalization flags proc blobHash*(s: string): BlobId = doAssert(s.len <= blobLeafSize) var ctx: Blake2b256 leaves: array[1, BlobId] ctx.init do (params: var Blake2bParams): params.fanout = 2 params.depth = 255 params.leafLength = blobLeafSize params.nodeOffset = 0 if s.len > 0: ctx.update(unsafeAddr s[0], s.len) leaves[0] = finish ctx compressTree(leaves) leaves[0] proc commit*(store: BlobStore; bs: BlobSet): Future[BlobSet] {.async.} = if bs.isCold: return bs let tmp = BlobSet(kind: hotNode, bitmap: bs.bitmap, table: bs.table) for e in tmp.table.mitems: if e.isHot: let cold = await store.commit e assert(not cold.isNil) e = cold var buf = encode tmp.toCbor let localId = blobHash(buf) present = await store.contains(localId, metaBlob) if present: return BlobSet(kind: coldNode, setId: localId) else: let stream = store.openIngestStream(size=buf.len, kind=metaBlob) await stream.ingest(buf) let (storeId, _) = await finish(stream) assert(localId == storeId) return BlobSet(kind: coldNode, setId: storeId) # # Null Store implementation # type NullIngestStream = ref NullIngestStreamObj NullIngestStreamObj = object of IngestStreamObj ctx: Blake2b256 leaves: seq[BlobId] pos, nodeOffset: BiggestInt proc nullBlobClose(s: BlobStream) = discard proc setPosNull(s: BlobStream; pos: BiggestInt) = discard proc getPosNull(s: BlobStream): BiggestInt = discard proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): Future[int] = result = newFuture[int]() complete result, 0 proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream = BlobStream( closeImpl: nullBlobClose, setPosImpl: setPosNull, getPosImpl: getPosNull, readImpl: nullBlobRead) proc nullFinish(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] = var s = NullIngestStream(s) s.leaves.add finish(s.ctx) compressTree(s.leaves) var pair: tuple[id: BlobId, size: BiggestInt] pair.id = s.leaves[0] pair.size = s.pos result = newFuture[tuple[id: BlobId, size: BiggestInt]]() complete result, pair proc nullIngest(s: IngestStream; buf: pointer; len: Natural): Future[void] = var s = NullIngestStream(s) off = 0 buf = cast[ptr array[blobLeafSize, byte]](buf) while off < len: var n = min(blobLeafSize, len-off) let leafOff = int(s.pos and blobLeafSizeMask) if leafOff == 0: if s.pos > 0: s.leaves.add finish(s.ctx) s.ctx.init do (params: var Blake2bParams): params.fanout = 2 params.depth = 255 params.leafLength = blobLeafSize params.nodeOffset = s.nodeOffset inc s.nodeOffset else: n = min(n, blobLeafSize-leafOff) s.ctx.update(buf[off].addr, n) off.inc n s.pos.inc n result = newFuture[void]() complete result proc nullOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream = NullIngestStream( finishImpl: nullFinish, ingestImpl: nullIngest, leaves: newSeq[BlobId]()) proc newNullStore*(): BlobStore = BlobStore( openBlobStreamImpl: nullOpenBlobStream, openIngestStreamImpl: nullOpenIngestStream)