From 1aba9dcd42d5f2e96d37e3dfe256e103bab2818f Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Fri, 21 Dec 2018 03:50:36 +0100 Subject: [PATCH] Ingest and dump blobs --- .gitignore | 12 +- blobsets.nimble | 12 + dagfs.nimble | 12 - src/{dagfs_repl.nim => blobset.nim} | 161 +++++++--- src/blobsets.nim | 301 ++++++++++++++++++ src/{dagfs => blobsets}/fsnodes.nim | 19 +- src/{dagfs => blobsets}/ipfsdaemon.nim | 2 +- src/{dagfs => blobsets}/priv/blake2.nim | 174 +++++----- src/{dagfs => blobsets}/priv/hex.nim | 0 src/{dagfs => blobsets}/replicator.nim | 2 +- src/blobsets/stores.nim | 403 ++++++++++++++++++++++++ src/{dagfs => blobsets}/tcp.nim | 16 +- src/dagfs.nim | 153 --------- src/dagfs/stores.nim | 155 --------- 14 files changed, 949 insertions(+), 473 deletions(-) create mode 100644 blobsets.nimble delete mode 100644 dagfs.nimble rename src/{dagfs_repl.nim => blobset.nim} (79%) create mode 100644 src/blobsets.nim rename src/{dagfs => blobsets}/fsnodes.nim (95%) rename src/{dagfs => blobsets}/ipfsdaemon.nim (98%) rename src/{dagfs => blobsets}/priv/blake2.nim (52%) rename src/{dagfs => blobsets}/priv/hex.nim (100%) rename src/{dagfs => blobsets}/replicator.nim (98%) create mode 100644 src/blobsets/stores.nim rename src/{dagfs => blobsets}/tcp.nim (94%) delete mode 100644 src/dagfs.nim delete mode 100644 src/dagfs/stores.nim diff --git a/.gitignore b/.gitignore index cae04ff..0fcee89 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,2 @@ nimcache -ipldrepl -/dagfs_repl -/genode/dagfs_genode/dagfs_fs -/genode/dagfs_genode/dagfs_fs_store -/genode/dagfs_genode/dagfs_rom -/genode/dagfs_genode/dagfs_tcp_store -/genode/dagfs_genode/bin/dagfs_fs -/genode/dagfs_genode/bin/dagfs_fs_store -/genode/dagfs_genode/bin/dagfs_rom -/genode/dagfs_genode/bin/dagfs_server -/genode/dagfs_genode/bin/dagfs_tcp_store +blobset diff --git a/blobsets.nimble b/blobsets.nimble new file mode 100644 index 0000000..fe76a5c --- /dev/null +++ b/blobsets.nimble @@ -0,0 +1,12 @@ +# Package + +version = "0.1.2" +author = "Emery Hemingway" +description = "Sets of named blobs" +license = "AGPLv3" +srcDir = "src" + +requires "nim >= 0.18.0", "base58", "cbor >= 0.5.1", "siphash", "nimcrypto" + +bin = @["blobset"] +skipFiles = @["blobset.nim"] diff --git a/dagfs.nimble b/dagfs.nimble deleted file mode 100644 index c73e6a6..0000000 --- a/dagfs.nimble +++ /dev/null @@ -1,12 +0,0 @@ -# Package - -version = "0.1.2" -author = "Emery Hemingway" -description = "A simple content addressed file-system" -license = "GPLv3" -srcDir = "src" - -requires "nim >= 0.18.0", "base58", "cbor >= 0.5.1", "siphash" - -bin = @["dagfs_repl"] -skipFiles = @["dagfs_repl.nim"] diff --git a/src/dagfs_repl.nim b/src/blobset.nim similarity index 79% rename from src/dagfs_repl.nim rename to src/blobset.nim index 2801752..eecf945 100644 --- a/src/dagfs_repl.nim +++ b/src/blobset.nim @@ -1,17 +1,25 @@ -import nre, os, strutils, tables, parseopt, streams, cbor +when not isMainModule: + {.error: "this module is not a library, import blobsets instead".} -import ./dagfs, ./dagfs/stores, ./dagfs/fsnodes +import std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin +import cbor +import ./blobsets, ./blobsets/stores, ./blobsets/fsnodes + +when defined(genode): + import dagfsclient +else: + import ./blobsets/tcp type EvalError = object of CatchableError -type Env = ref EnvObj AtomKind = enum atomPath atomCid atomString + atomNum atomSymbol atomError @@ -24,6 +32,8 @@ type cid: Cid of atomString: str: string + of atomNum: + num: BiggestInt of atomSymbol: sym: string of atomError: @@ -55,8 +65,9 @@ type nextRef: NodeRef EnvObj = object - store: DagfsStore + store: BlobStore bindings: Table[string, NodeObj] + blobs: Table[string, tuple[id: BlobId, size: BiggestInt]] paths: Table[string, FsNode] cids: Table[Cid, FsNode] @@ -79,6 +90,9 @@ proc newAtomPath(s: string): Atom = proc newAtomString(s: string): Atom = Atom(kind: atomString, str: s) +proc newAtom(i: Natural): Atom = + Atom(kind: atomNum, num: i) + proc newNodeError(msg: string; n: NodeObj): NodeRef = var p = new NodeRef p[] = n @@ -139,6 +153,13 @@ template returnError(n: NodeObj) = if n.atom.kind == atomError: return n.atom.newNode +proc getBlob(env: Env; path: string): tuple[id: BlobId, size: BiggestInt] = + result = env.blobs.getOrDefault(path) + if result.size == 0: + result = env.store.ingestFile(path) + if result.size != 0: + env.blobs[path] = result + proc getFile(env: Env; path: string): FsNode = result = env.paths.getOrDefault path if result.isNil: @@ -201,6 +222,8 @@ proc print(a: Atom; s: Stream) = if not valid: break f.write chunk ]# + of atomNum: + s.write $a.num of atomSymbol: s.write a.sym of atomError: @@ -217,7 +240,7 @@ proc print(ast: NodeObj; s: Stream) = for n in ast.list: s.write " " n.print(s) - s.write ")" + s.write " )" of nodeFunc: s.write "# 48: - Atom(kind: atomCid, cid: token.parseCid) + Atom(kind: atomCid, cid: token.toBlobId) else: Atom(kind: atomSymbol, sym: token.normalize) #except: @@ -388,6 +411,14 @@ proc ingestFunc(env: Env; args: NodeObj): NodeRef = cid = env.store.putDag(root.toCbor) cid.newAtom.newNode +proc blobFunc(env: Env; args: NodeObj): NodeRef = + assertArgCount(args, 1) + let (blob, size) = env.getBlob args.atom.path + result = newNodeList() + result.append blob.newAtom.newNode + result.append size.newAtom.newNode + # TODO: natural number atom + proc listFunc(env: Env; args: NodeObj): NodeRef = ## Standard Lisp 'list' function. result = newNodeList() @@ -461,10 +492,11 @@ proc bindEnv(env: Env; name: string; fun: Func) = assert(not env.bindings.contains name) env.bindings[name] = NodeObj(kind: nodeFunc, fun: fun, name: name) -proc newEnv(store: DagfsStore): Env = +proc newEnv(store: BlobStore): Env = result = Env( store: store, bindings: initTable[string, NodeObj](), + blobs: initTable[string, tuple[id: BlobId, size: BiggestInt]](), paths: initTable[string, FsNode](), cids: initTable[Cid, FsNode]()) result.bindEnv "apply", applyFunc @@ -481,6 +513,7 @@ proc newEnv(store: DagfsStore): Env = result.bindEnv "path", pathFunc result.bindEnv "root", rootFunc result.bindEnv "walk", walkFunc + result.bindEnv "blob", blobFunc proc eval(ast: NodeRef; env: Env): NodeRef @@ -534,55 +567,37 @@ proc eval(ast: NodeRef; env: Env): NodeRef = except OSError: newNodeError(getCurrentExceptionMsg(), input) -var scripted = false +proc readLineSimple(prompt: string; line: var TaintedString): bool = + stdin.readLine(line) when defined(genode): - import dagfsclient - proc openStore(): DagfsStore = + proc openStore(): BlobStore = result = newDagfsClient("repl") - scripted = true # do not use linenoise for the moment - #[ - for kind, key, value in getopt(): - if kind == cmdShortOption and key == "s": - scripted = true - else: - quit "unhandled argument " & key - ]# else: - import ./dagfs/tcp - proc openStore(): DagfsStore = + proc openStore(): BlobStore = var host = "" for kind, key, value in getopt(): - case kind - of cmdShortOption: - if key == "s": - scripted = true - else: - quit "unhandled argument " & key - of cmdArgument: - if host != "": - quit "only a single store path argument is accepted" - host = key - else: - quit "unhandled argument " & key + if kind == cmdShortOption: + if key == "h": + if host != "": + quit "only a single store path argument is accepted" + host = value if host == "": host = "127.0.0.1" try: result = newTcpClient(host) except: quit("failed to connect to store at $1 ($2)" % [host, getCurrentExceptionMsg()]) - -import rdstdin - -proc readLineSimple(prompt: string; line: var TaintedString): bool = - stdin.readLine(line) - -proc main() = +proc replMain() = + var scripted: bool + for kind, key, value in getopt(): + if kind == cmdShortOption and key == "s": + scripted = true let - store = openStore() + #store = openStore() + store = newFileStore("/tmp/blobs") env = newEnv(store) outStream = stdout.newFileStream readLine = if scripted: readLineSimple else: readLineFromStdin - var reader = newReader() line = newStringOfCap 128 @@ -594,5 +609,67 @@ proc main() = outStream.write "\n" flush outStream +proc dumpMain() = + var args = newSeq[string]() + for kind, key, val in getopt(): + if kind == cmdArgument: + args.add key + if args.len > 1: + let store = newFileStore("/tmp/blobs") + for i in 1..args.high: + try: + for chunk in store.dumpBlob(args[i].toBlobId): + write(stdout, chunk) + except: + writeLine(stderr, "failed to dump '", args[i], "', ", getCurrentExceptionMsg()) + quit(-1) + +proc insertPath(set: BlobSet; store: BlobStore; kind: PathComponent; path: string) = + try: + case kind + of pcFile, pcLinkToFile: + let (id, size) = store.ingestFile(path) + set.insert(path, id, size) + writeLine(stdout, id, align($size, 11), " ", path) + of pcDir, pcLinkToDir: + for kind, subPath in path.walkDir: + set.insertPath(store, kind, normalizedPath subPath) + except: + let e = getCurrentException() + writeLine(stderr, "failed to ingest '", path, "', ", e.msg) + # raise e + +proc ingestMain() = + var args = newSeq[string]() + for kind, key, val in getopt(): + if kind == cmdArgument: + args.add key + if args.len > 1: + var set = newBlobSet() + #let store = newFileStore("/tmp/blobs") + let store = newNullStore() + for i in 1..args.high: + let path = normalizedPath args[i] + set.insertPath(store, path.getFileInfo.kind, path) + let final = store.commit set + writeLine(stdout, final.setId) + +proc main() = + var cmd = "" + for kind, key, val in getopt(): + if kind == cmdArgument: + cmd = key + break + case normalize(cmd) + of "": + quit("no subcommand specified") + #of "repl": + # replMain() + of "dump": + dumpMain() + of "ingest": + ingestMain() + else: + quit("no such subcommand ") + main() -quit 0 # Genode doesn't implicitly quit diff --git a/src/blobsets.nim b/src/blobsets.nim new file mode 100644 index 0000000..da452fe --- /dev/null +++ b/src/blobsets.nim @@ -0,0 +1,301 @@ +import std/hashes, std/streams, std/strutils, std/bitops, std/unicode, std/endians +import base58/bitcoin, cbor, siphash +import ./blobsets/priv/hex + +import nimcrypto, nimcrypto/blake2 + +const + digestLen* = 32 + ## Length of a chunk digest. + cidSize* = digestLen + ## Size of CID object in memory + blobLeafSize* = 1 shl 14 + ## Size of blob leaves. + blobLeafSizeMask* = not(not(0) shl 14) + visualLen = 32 * 3 + + maxChunkSize* {.deprecated} = blobLeafSize + +type + Blake2b256* = Blake2bContext[256] + + BlobId* = MDigest[Blake2b256.bits] + ## Blob Identifier + SetId* = MDigest[Blake2b256.bits] + ## Set Identifier + + Cid* {.deprecated} = BlobId + +func `$`*(bh: BlobId): string = + ## Convert a blob hash to a visual representation. + const baseRune = 0x2800 + result = newString(visualLen) + var pos = 0 + for b in bh.data.items: + let r = (Rune)baseRune or b.int + fastToUTF8Copy(r, result, pos, true) + +func toBlobId*(s: string): BlobId = + ## Parse a visual blob hash to binary. + if s.len == visualLen: + var + pos: int + r: Rune + for b in result.data.mitems: + fastRuneAt(s, pos, r, true) + b = r.byte + +proc `==`*(x, y: BlobId): bool = x.data == y.data + ## Compare two BlobIds. + +proc `==`*(cbor: CborNode; cid: BlobId): bool = + ## Compare a CBOR node with a BlobId. + if cbor.kind == cborBytes: + for i in 0.. -1) diff --git a/src/dagfs/ipfsdaemon.nim b/src/blobsets/ipfsdaemon.nim similarity index 98% rename from src/dagfs/ipfsdaemon.nim rename to src/blobsets/ipfsdaemon.nim index 046d21d..a643c05 100644 --- a/src/dagfs/ipfsdaemon.nim +++ b/src/blobsets/ipfsdaemon.nim @@ -1,6 +1,6 @@ import httpclient, json, base58/bitcoin, streams, cbor, tables -import ../dagfs, ./stores, ./fsnodes +import ../blobsets, ./stores, ./fsnodes type IpfsStore* = ref IpfsStoreObj diff --git a/src/dagfs/priv/blake2.nim b/src/blobsets/priv/blake2.nim similarity index 52% rename from src/dagfs/priv/blake2.nim rename to src/blobsets/priv/blake2.nim index a0a8327..45cbee8 100644 --- a/src/dagfs/priv/blake2.nim +++ b/src/blobsets/priv/blake2.nim @@ -1,10 +1,73 @@ +import std/bitops, std/endians + type - Blake2b* = object - hash: array[8, uint64] - offset: array[2, uint64] - buffer: array[128, uint8] - buffer_idx: uint8 - hash_size: uint8 + Blake2b* = object + hash: array[8, uint64] + offset: array[2, uint64] + buffer: array[128, uint8] + buffer_idx: uint8 + hash_size: uint8 + + Blake2bParams* = object + b: array[64, byte] + Blake2sParams* = object + b: array[32, byte] + Blake2Params* = Blake2bParams | Blake2sParams + +proc params(c: var Blake2b): ptr Blake2bParams = + cast[ptr Blake2bParams](c.hash.addr) + +proc `digestLen=`*(p: ptr Blake2bParams; x: range[1..64]) = + p.b[0] = (uint8)x +proc `digestLen=`*(p: ptr Blake2sParams; x: range[1..32]) = + p.b[0] = (uint8)x +proc `keyLen=`*(p: ptr Blake2bParams; x: range[0..64]) = + p.b[1] = (uint8)x +proc `keyLen=`*(p: ptr Blake2sParams; x: range[0..32]) = + p.b[1] = (uint8)x +proc `fanout=`*(p: ptr Blake2Params; x: Natural) = + p.b[2] = (uint8)x +proc `depth=`*(p: ptr Blake2Params; x: Natural) = + p.b[3] = (uint8)x + +proc `leafLength=`*(p: ptr Blake2Params; x: Natural) = + var x = x; littleEndian32(p.b[4].addr, x.addr) + +proc `nodeOffset=`*(p: ptr Blake2bParams; x: Natural) = + var x = x; littleEndian64(p.b[8].addr, x.addr) +proc `nodeOffset=`*(p: ptr Blake2sParams; x: Natural) = + var tmp: int64 + littleEndian64(tmp.addr, p.b[8].addr) + tmp = (tmp and 0xffffffff) or (x shl 32) + littleEndian64(p.b[8].addr, tmp.addr) + +proc `nodeDepth=`*(p: ptr Blake2bParams; x: Natural) = + p.b[16] = (uint8)x +proc `nodeDepth=`*(p: ptr Blake2sParams; x: Natural) = + p.b[14] = (uint8)x + +proc `innerLength=`*(p: ptr Blake2bParams; x: Natural) = + p.b[17] = (uint8)x +proc `innerLength=`*(p: ptr Blake2sParams; x: Natural) = + p.b[15] = (uint8)x + +proc `salt=`*(p: ptr Blake2bParams; salt: pointer; len: Natural) = + copyMem(p.b[32].addr, salt, min(len, 16)) +proc `salt=`*(p: ptr Blake2sParams; salt: pointer; len: Natural) = + copyMem(p.b[16].addr, salt, min(len, 8)) + +proc `personal=`*(p: ptr Blake2bParams; salt: pointer; len: Natural) = + copyMem(p.b[48].addr, salt, min(len, 16)) +proc `personal=`*(p: ptr Blake2sParams; salt: pointer; len: Natural) = + copyMem(p.b[24].addr, salt, min(len, 8)) + +proc init(p: ptr Blake2Params) = + when p is Blake2bParams: + p.digestLen = 64 + else: + p.digestLen = 32 + p.fanout = 1 + p.depth = 1 const Blake2bIV = [ 0x6a09e667f3bcc908'u64, 0xbb67ae8584caa73b'u64, @@ -33,20 +96,17 @@ proc inc(a: var array[2, uint64], b: uint8) = proc padding(a: var array[128, uint8], b: uint8) = for i in b..127: a[i] = 0 -proc ror64(x: uint64, n: int): uint64 {.inline.} = - result = (x shr n) or (x shl (64 - n)) - proc G (v: var array[16, uint64], a,b,c,d: int, x,y: uint64) {.inline.} = v[a] = v[a] + v[b] + x - v[d] = ror64(v[d] xor v[a], 32) + v[d] = rotateRightBits(v[d] xor v[a], 32) v[c] = v[c] + v[d] - v[b] = ror64(v[b] xor v[c], 24) + v[b] = rotateRightBits(v[b] xor v[c], 24) v[a] = v[a] + v[b] + y - v[d] = ror64(v[d] xor v[a], 16) + v[d] = rotateRightBits(v[d] xor v[a], 16) v[c] = v[c] + v[d] - v[b] = ror64(v[b] xor v[c], 63) + v[b] = rotateRightBits(v[b] xor v[c], 63) proc compress(c: var Blake2b, last: int = 0) = var input, v: array[16, uint64] @@ -72,7 +132,7 @@ proc compress(c: var Blake2b, last: int = 0) = c.buffer_idx = 0 {.push boundChecks: off.} -proc blake2b_update*(c: var Blake2b, buf: pointer, data_size: int) = +proc update*(c: var Blake2b, buf: pointer, data_size: int) = var data = cast[ptr array[0, uint8]](buf) for i in 0..= 1'u8 and hash_size <= 64'u8) - assert(key_size >= 0 and key_size <= 64) - c.hash = Blake2bIV - c.hash[0] = c.hash[0] xor 0x01010000 xor cast[uint64](key_size shl 8) xor hash_size - c.hash_size = hash_size +proc initBlake2b*(key: pointer = nil, key_size: range[0..64] = 0): Blake2b = + init(result.params) + result.hash_size = result.params.b[0] + result.params.keyLen = keySize + for i in 0..7: + result.hash[i] = Blake2bIV[i] if key_size > 0: - blake2b_update(c, key, key_size) - padding(c.buffer, c.buffer_idx) - c.buffer_idx = 128 + update(result, key, key_size) + padding(result.buffer, result.buffer_idx) + result.buffer_idx = 128 -proc blake2b_final*(c: var Blake2b): seq[uint8] = +proc initBlake2b*(customize: proc(params: ptr Blake2bParams)): Blake2b = + let p = result.params + init(p) + customize(p) + result.hash_size = p.b[0] + for i in 0..7: + result.hash[0] = result.hash[0] xor Blake2bIV[i] + +proc finish*(c: var Blake2b): seq[uint8] = result = newSeq[uint8](c.hash_size) inc(c.offset, c.buffer_idx) padding(c.buffer, c.buffer_idx) compress(c, 1) for i in 0'u8.. 0) + +proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt): BlobStream = + ## Return a new `BlobStream` for reading a blob. + assert(not s.openBlobStreamImpl.isNil) + s.openBlobStreamImpl(s, id, size) + +proc openIngestStream*(s: BlobStore; size = 0.BiggestInt): IngestStream = + ## Return a new `IngestStream` for ingesting a blob. + assert(not s.openIngestStreamImpl.isNil) + s.openIngestStreamImpl(s, size) + +proc get*(s: BlobStore; cid: Cid): string = + ## Retrieve a raw block from the store. + result = "" + s.get(cid, result) + +proc putDag*(s: BlobStore; dag: CborNode): Cid = + ## Place an Dagfs node in the store. + var raw = encode dag + s.put raw + +proc getDag*(s: BlobStore; cid: Cid): CborNode = + ## Retrieve an CBOR DAG from the store. + let stream = newStringStream(s.get(cid)) + result = parseCbor stream + close stream + +type + FileStore* = ref FileStoreObj + ## A store that writes nodes and leafs as files. + FileStoreObj = object of BlobStoreObj + root, buf: string + +proc parentAndFile(fs: FileStore; cid: Cid): (string, string) = + ## Generate the parent path and file path of CID within the store. + let digest = hex.encode(cid.data) + result[0] = fs.root / digest[0..1] + result[1] = result[0] / digest[2..digest.high] + +proc fsPutBuffer(s: BlobStore; buf: pointer; len: Natural): Cid = + var fs = FileStore(s) + result = dagHash(buf, len) + if result != zeroChunk: + let (dir, path) = fs.parentAndFile(result) + if not existsDir dir: + createDir dir + if not existsFile path: + fs.buf.setLen(len) + copyMem(addr fs.buf[0], buf, fs.buf.len) + let + tmp = fs.root / "tmp" + writeFile(tmp, fs.buf) + moveFile(tmp, path) + +proc fsPut(s: BlobStore; chunk: string): Cid = + var fs = FileStore(s) + result = dagHash chunk + if result != zeroChunk: + let (dir, path) = fs.parentAndFile(result) + if not existsDir dir: + createDir dir + if not existsFile path: + let + tmp = fs.root / "tmp" + writeFile(tmp, chunk) + moveFile(tmp, path) + +proc fsGetBuffer(s: BlobStore; cid: Cid; buf: pointer; len: Natural): int = + var fs = FileStore(s) + let (_, path) = fs.parentAndFile cid + if existsFile path: + let fSize = path.getFileSize + if maxChunkSize < fSize: + discard tryRemoveFile path + raiseMissing cid + if len.int64 < fSize: + raise newException(BufferTooSmall, "file is $1 bytes, buffer is $2" % [$fSize, $len]) + let file = open(path, fmRead) + result = file.readBuffer(buf, len) + close file + if result == 0: + raiseMissing cid + +proc fsGet(s: BlobStore; cid: Cid; result: var string) = + var fs = FileStore(s) + let (_, path) = fs.parentAndFile cid + if existsFile path: + let fSize = path.getFileSize + if fSize > maxChunkSize: + discard tryRemoveFile path + raiseMissing cid + result.setLen fSize.int + let + file = open(path, fmRead) + n = file.readChars(result, 0, result.len) + close file + doAssert(n == result.len) + else: + raiseMissing cid + +func compressTree(leaves: var seq[BlobId]) = + var + ctx: Blake2b256 + nodeOffset = 0 + nodeDepth = 0 + while leaves.len > 1: + nodeOffset = 0 + inc nodeDepth + var pos, next: int + while pos < leaves.len: + ctx.init do (params: var Blake2bParams): + params.fanout = 2 + params.depth = 255 + params.leafLength = blobLeafSize + params.nodeOffset = nodeOffset + params.nodeDepth = nodeDepth + inc nodeOffset + ctx.update(leaves[pos].data) + inc pos + if pos < leaves.len: + ctx.update(leaves[pos].data) + inc pos + leaves[next] = ctx.finish() + inc next + leaves.setLen(next) + # TODO: BLAKE2 tree finalization flags + +iterator dumpBlob*(store: BlobStore; id: BlobId): string = + var + stream = store.openBlobStream(id) + buf = newString(blobLeafSize) + defer: + close stream + while true: + buf.setLen(blobLeafSize) + let n = stream.read(buf[0].addr, buf.len) + if n == 0: + break + buf.setLen(n) + yield buf + +proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: BiggestInt] = + ## Ingest a file and return blob metadata. + let + file = openAsync(path, fmRead) + fileSize = file.getFileSize + defer: + close file + let stream = store.openIngestStream(fileSize) + if fileSize > 0: + var buf = newString(min(blobLeafSize, fileSize)) + while true: + let n = waitFor file.readBuffer(buf[0].addr, buf.len) + if n == 0: break + stream.ingest(buf[0].addr, n) + result = finish stream + +proc commit*(store: BlobStore; bs: BlobSet): BlobSet = + assert(bs.kind == hotNode) + for e in bs.table.mitems: + case e.kind + of coldNode, leafNode: discard + of hotNode: + e = store.commit e + let stream = store.openIngestStream() + var buf = encode bs.toCbor + stream.ingest(buf) + let (id, _) = finish stream + result = BlobSet(kind: coldNode, setId: id) + +type + FsBlobStream = ref FsBlobStreamObj + FsBlobStreamObj = object of BlobStreamObj + path: string + file: AsyncFile + + NullIngestStream = ref NullIngestStreamObj + NullIngestStreamObj = object of IngestStreamObj + ctx: Blake2b256 + leaves: seq[BlobId] + pos, nodeOffset: BiggestInt + + FsIngestStream = ref FsIngestStreamObj + FsIngestStreamObj = object of IngestStreamObj + ctx: Blake2b256 + leaves: seq[BlobId] + path: string + file: AsyncFile + pos, nodeOffset: BiggestInt + +proc nullBlobClose(s: BlobStream) = discard + +proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = 0 + +proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt): BlobStream = + BlobStream(closeImpl: nullBlobClose, readImpl: nullBlobRead) + +proc fsBlobClose(s: BlobStream) = + var s = FsBlobStream(s) + close s.file + +proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = + var s = FsBlobStream(s) + result = waitFor s.file.readBuffer(buffer, len) + +proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt): BlobStream = + var fs = FileStore(s) + let stream = FsBlobStream() + result = stream + stream.path = fs.root / $id + stream.file = openAsync(stream.path, fmRead) + stream.closeImpl = fsBlobClose + stream.readImpl = fsBlobRead + +proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = + var s = FsIngestStream(s) + close s.file + s.leaves.add finish(s.ctx) + compressTree(s.leaves) + result.id = s.leaves[0] + result.size = s.pos + moveFile(s.path, s.path.parentDir / $(result.id)) + +proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = + var s = NullIngestStream(s) + s.leaves.add finish(s.ctx) + compressTree(s.leaves) + result.id = s.leaves[0] + result.size = s.pos + +proc nullIngest(s: IngestStream; buf: pointer; len: Natural) = + var + s = NullIngestStream(s) + off = 0 + buf = cast[ptr array[blobLeafSize, byte]](buf) + while off < len: + var n = min(blobLeafSize, len-off) + let leafOff = int(s.pos and blobLeafSizeMask) + if leafOff == 0: + if s.pos > 0: + s.leaves.add finish(s.ctx) + s.ctx.init do (params: var Blake2bParams): + params.fanout = 2 + params.depth = 255 + params.leafLength = blobLeafSize + params.nodeOffset = s.nodeOffset + inc s.nodeOffset + else: + n = min(n, blobLeafSize-leafOff) + s.ctx.update(buf[off].addr, n) + off.inc n + s.pos.inc n + +proc fsIngest(s: IngestStream; buf: pointer; len: Natural) = + var + s = FsIngestStream(s) + off = 0 + buf = cast[ptr array[blobLeafSize, byte]](buf) + while off < len: + var n = min(blobLeafSize, len-off) + let leafOff = int(s.pos and blobLeafSizeMask) + if leafOff == 0: + if s.pos > 0: + s.leaves.add finish(s.ctx) + s.ctx.init do (params: var Blake2bParams): + params.fanout = 2 + params.depth = 255 + params.leafLength = blobLeafSize + params.nodeOffset = s.nodeOffset + inc s.nodeOffset + else: + n = min(n, blobLeafSize-leafOff) + s.ctx.update(buf[off].addr, n) + waitFor s.file.writeBuffer(buf[off].addr, n) + off.inc n + s.pos.inc n + +proc nullOpenIngestStream(s: BlobStore; size: BiggestInt): IngestStream = + NullIngestStream( + finishImpl: nullFinish, ingestImpl: nullIngest, leaves: newSeq[BlobId]()) + +proc fsOpenIngestStream(s: BlobStore; size: BiggestInt): IngestStream = + var fs = FileStore(s) + let stream = FsIngestStream() + result = stream + stream.finishImpl = fsFinish + stream.ingestImpl = fsIngest + stream.path = fs.root / "ingest" + stream.file = openAsync(stream.path, fmWrite) + if size > 0: + stream.file.setFileSize(size) + stream.leaves = newSeqOfCap[BlobId](leafCount size) + else: + stream.leaves = newSeq[BlobId]() + +proc newNullStore*(): BlobStore = + BlobStore( + openBlobStreamImpl: nullOpenBlobStream, + openIngestStreamImpl: nullOpenIngestStream) + +proc newFileStore*(root: string): FileStore = + if not existsDir(root): + createDir root + new result + result.putBufferImpl = fsPutBuffer + result.putImpl = fsPut + result.getBufferImpl = fsGetBuffer + result.getImpl = fsGet + result.openBlobStreamImpl = fsOpenBlobStream + result.openIngestStreamImpl = fsOpenIngestStream + result.root = root + result.buf = "" diff --git a/src/dagfs/tcp.nim b/src/blobsets/tcp.nim similarity index 94% rename from src/dagfs/tcp.nim rename to src/blobsets/tcp.nim index 9500276..d61bc8c 100644 --- a/src/dagfs/tcp.nim +++ b/src/blobsets/tcp.nim @@ -1,5 +1,5 @@ import std/asyncnet, std/asyncdispatch, std/streams -import ../dagfs, ./stores +import ../blobsets, ./stores const defaultPort = Port(1023) @@ -55,9 +55,9 @@ type TcpServer* = ref TcpServerObj TcpServerObj = object sock: AsyncSocket - store: DagfsStore + store: BlobStore -proc newTcpServer*(store: DagfsStore; port = defaultPort): TcpServer = +proc newTcpServer*(store: BlobStore; port = defaultPort): TcpServer = ## Create a new TCP server that serves `store`. result = TcpServer(sock: newAsyncSocket(buffered=true), store: store) result.sock.bindAddr(port) @@ -153,11 +153,11 @@ proc close*(server: TcpServer) = type TcpClient* = ref TcpClientObj - TcpClientObj = object of DagfsStoreObj + TcpClientObj = object of BlobStoreObj sock: AsyncSocket buf: string -proc tcpClientPutBuffer(s: DagfsStore; buf: pointer; len: Natural): Cid = +proc tcpClientPutBuffer(s: BlobStore; buf: pointer; len: Natural): Cid = var client = TcpClient(s) result = dagHash(buf, len) if result != zeroChunk: @@ -186,7 +186,7 @@ proc tcpClientPutBuffer(s: DagfsStore; buf: pointer; len: Natural): Cid = else: raiseAssert "invalid server message" -proc tcpClientPut(s: DagfsStore; chunk: string): Cid = +proc tcpClientPut(s: BlobStore; chunk: string): Cid = var client = TcpClient(s) result = dagHash chunk if result != zeroChunk: @@ -215,7 +215,7 @@ proc tcpClientPut(s: DagfsStore; chunk: string): Cid = else: raiseAssert "invalid server message" -proc tcpClientGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int = +proc tcpClientGetBuffer(s: BlobStore; cid: Cid; buf: pointer; len: Natural): int = var client = TcpClient(s) msg: Message @@ -242,7 +242,7 @@ proc tcpClientGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): in else: raiseMissing cid -proc tcpClientGet(s: DagfsStore; cid: Cid; result: var string) = +proc tcpClientGet(s: BlobStore; cid: Cid; result: var string) = result.setLen maxChunkSize let n = s.getBuffer(cid, result[0].addr, result.len) result.setLen n diff --git a/src/dagfs.nim b/src/dagfs.nim deleted file mode 100644 index dfa85ff..0000000 --- a/src/dagfs.nim +++ /dev/null @@ -1,153 +0,0 @@ -import std/hashes, std/streams, std/strutils -import base58/bitcoin, cbor, siphash -import ./dagfs/priv/hex, ./dagfs/priv/blake2 - -const - maxChunkSize* = 1 shl 18 - ## Maximum supported chunk size. - digestLen* = 32 - ## Length of a chunk digest. - cidSize* = digestLen - ## Size of CID object in memory - -type Cid* = object - ## Chunk IDentifier - digest*: array[digestLen, uint8] - -proc initCid*(): Cid = Cid() - ## Initialize an invalid CID. - -proc isValid*(x: Cid): bool = - ## Check that a CID has been properly initialized. - for c in x.digest.items: - if c != 0: return true - -proc `==`*(x, y: Cid): bool = - ## Compare two CIDs. - for i in 0.. 0) - -proc get*(s: DagfsStore; cid: Cid): string = - ## Retrieve a raw block from the store. - result = "" - s.get(cid, result) - -proc putDag*(s: DagfsStore; dag: CborNode): Cid = - ## Place an Dagfs node in the store. - var raw = encode dag - s.put raw - -proc getDag*(s: DagfsStore; cid: Cid): CborNode = - ## Retrieve an CBOR DAG from the store. - let stream = newStringStream(s.get(cid)) - result = parseCbor stream - close stream - -type - FileStore* = ref FileStoreObj - ## A store that writes nodes and leafs as files. - FileStoreObj = object of DagfsStoreObj - root, buf: string - -proc parentAndFile(fs: FileStore; cid: Cid): (string, string) = - ## Generate the parent path and file path of CID within the store. - let digest = hex.encode(cid.digest) - result[0] = fs.root / digest[0..1] - result[1] = result[0] / digest[2..digest.high] - -proc fsPutBuffer(s: DagfsStore; buf: pointer; len: Natural): Cid = - var fs = FileStore(s) - result = dagHash(buf, len) - if result != zeroChunk: - let (dir, path) = fs.parentAndFile(result) - if not existsDir dir: - createDir dir - if not existsFile path: - fs.buf.setLen(len) - copyMem(addr fs.buf[0], buf, fs.buf.len) - let - tmp = fs.root / "tmp" - writeFile(tmp, fs.buf) - moveFile(tmp, path) - -proc fsPut(s: DagfsStore; chunk: string): Cid = - var fs = FileStore(s) - result = dagHash chunk - if result != zeroChunk: - let (dir, path) = fs.parentAndFile(result) - if not existsDir dir: - createDir dir - if not existsFile path: - let - tmp = fs.root / "tmp" - writeFile(tmp, chunk) - moveFile(tmp, path) - -proc fsGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int = - var fs = FileStore(s) - let (_, path) = fs.parentAndFile cid - if existsFile path: - let fSize = path.getFileSize - if maxChunkSize < fSize: - discard tryRemoveFile path - raiseMissing cid - if len.int64 < fSize: - raise newException(BufferTooSmall, "file is $1 bytes, buffer is $2" % [$fSize, $len]) - let file = open(path, fmRead) - result = file.readBuffer(buf, len) - close file - if result == 0: - raiseMissing cid - -proc fsGet(s: DagfsStore; cid: Cid; result: var string) = - var fs = FileStore(s) - let (_, path) = fs.parentAndFile cid - if existsFile path: - let fSize = path.getFileSize - if fSize > maxChunkSize: - discard tryRemoveFile path - raiseMissing cid - result.setLen fSize.int - let - file = open(path, fmRead) - n = file.readChars(result, 0, result.len) - close file - doAssert(n == result.len) - else: - raiseMissing cid - -proc newFileStore*(root: string): FileStore = - ## Blocks retrieved by `get` are not hashed and verified. - if not existsDir(root): - createDir root - new result - result.putBufferImpl = fsPutBuffer - result.putImpl = fsPut - result.getBufferImpl = fsGetBuffer - result.getImpl = fsGet - result.root = root - result.buf = ""