2018-12-24 21:19:03 +01:00
|
|
|
import ../blobsets
|
|
|
|
|
|
|
|
import std/asyncfile, std/asyncdispatch, std/os
|
|
|
|
|
|
|
|
import nimcrypto/blake2
|
|
|
|
|
|
|
|
proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: BiggestInt] =
|
|
|
|
## Ingest a file and return blob metadata.
|
|
|
|
let
|
|
|
|
file = openAsync(path, fmRead)
|
|
|
|
fileSize = file.getFileSize
|
|
|
|
defer:
|
|
|
|
close file
|
|
|
|
let stream = store.openIngestStream(fileSize, dataBlob)
|
|
|
|
if fileSize > 0:
|
|
|
|
var buf = newString(min(blobLeafSize, fileSize))
|
|
|
|
while true:
|
|
|
|
let n = waitFor file.readBuffer(buf[0].addr, buf.len)
|
|
|
|
if n == 0: break
|
|
|
|
stream.ingest(buf[0].addr, n)
|
|
|
|
result = finish stream
|
|
|
|
|
|
|
|
type
|
|
|
|
FsBlobStream = ref FsBlobStreamObj
|
|
|
|
FsBlobStreamObj = object of BlobStreamObj
|
|
|
|
path: string
|
|
|
|
file: AsyncFile
|
|
|
|
|
|
|
|
FsIngestStream = ref FsIngestStreamObj
|
|
|
|
FsIngestStreamObj = object of IngestStreamObj
|
|
|
|
ctx: Blake2b256
|
|
|
|
leaves: seq[BlobId]
|
|
|
|
path: string
|
|
|
|
file: AsyncFile
|
|
|
|
pos, nodeOffset: BiggestInt
|
|
|
|
|
|
|
|
FileStore* = ref FileStoreObj
|
|
|
|
## A store that writes nodes and leafs as files.
|
|
|
|
FileStoreObj = object of BlobStoreObj
|
|
|
|
root, buf: string
|
|
|
|
|
|
|
|
proc fsBlobClose(s: BlobStream) =
|
|
|
|
var s = FsBlobStream(s)
|
|
|
|
close s.file
|
|
|
|
|
|
|
|
proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int =
|
|
|
|
var s = FsBlobStream(s)
|
|
|
|
result = waitFor s.file.readBuffer(buffer, len)
|
|
|
|
|
|
|
|
proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
|
|
|
|
var fs = FileStore(s)
|
|
|
|
let stream = FsBlobStream()
|
|
|
|
result = stream
|
2018-12-27 01:32:59 +01:00
|
|
|
case kind
|
|
|
|
of dataBlob:
|
|
|
|
stream.path = fs.root / "data" / $id
|
|
|
|
of metaBlob:
|
|
|
|
stream.path = fs.root / "blob" / $id
|
2018-12-24 21:19:03 +01:00
|
|
|
stream.file = openAsync(stream.path, fmRead)
|
|
|
|
stream.closeImpl = fsBlobClose
|
|
|
|
stream.readImpl = fsBlobRead
|
|
|
|
|
|
|
|
proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
|
|
|
var s = FsIngestStream(s)
|
|
|
|
close s.file
|
|
|
|
s.leaves.add finish(s.ctx)
|
|
|
|
compressTree(s.leaves)
|
|
|
|
result.id = s.leaves[0]
|
|
|
|
result.size = s.pos
|
|
|
|
moveFile(s.path, s.path.parentDir / $(result.id))
|
|
|
|
|
|
|
|
proc fsIngest(s: IngestStream; buf: pointer; len: Natural) =
|
|
|
|
var
|
|
|
|
s = FsIngestStream(s)
|
|
|
|
off = 0
|
|
|
|
buf = cast[ptr array[blobLeafSize, byte]](buf)
|
|
|
|
while off < len:
|
|
|
|
var n = min(blobLeafSize, len-off)
|
|
|
|
let leafOff = int(s.pos and blobLeafSizeMask)
|
|
|
|
if leafOff == 0:
|
|
|
|
if s.pos > 0:
|
|
|
|
s.leaves.add finish(s.ctx)
|
|
|
|
s.ctx.init do (params: var Blake2bParams):
|
|
|
|
params.fanout = 2
|
|
|
|
params.depth = 255
|
|
|
|
params.leafLength = blobLeafSize
|
|
|
|
params.nodeOffset = s.nodeOffset
|
|
|
|
inc s.nodeOffset
|
|
|
|
else:
|
|
|
|
n = min(n, blobLeafSize-leafOff)
|
|
|
|
s.ctx.update(buf[off].addr, n)
|
|
|
|
waitFor s.file.writeBuffer(buf[off].addr, n)
|
|
|
|
off.inc n
|
|
|
|
s.pos.inc n
|
|
|
|
|
|
|
|
proc fsOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream =
|
|
|
|
var fs = FileStore(s)
|
|
|
|
let stream = FsIngestStream()
|
|
|
|
result = stream
|
|
|
|
stream.finishImpl = fsFinish
|
|
|
|
stream.ingestImpl = fsIngest
|
|
|
|
case kind
|
|
|
|
of dataBlob:
|
|
|
|
stream.path = fs.root / "data" / "ingest"
|
|
|
|
of metaBlob:
|
|
|
|
stream.path = fs.root / "blob" / "ingest"
|
|
|
|
stream.file = openAsync(stream.path, fmWrite)
|
|
|
|
if size > 0:
|
|
|
|
stream.file.setFileSize(size)
|
|
|
|
stream.leaves = newSeqOfCap[BlobId](leafCount size)
|
|
|
|
else:
|
|
|
|
stream.leaves = newSeq[BlobId]()
|
|
|
|
|
|
|
|
proc newFileStore*(root = "/"): FileStore =
|
2018-12-27 01:32:59 +01:00
|
|
|
createDir(root / "data")
|
|
|
|
createDir(root / "blob")
|
2018-12-24 21:19:03 +01:00
|
|
|
new result
|
|
|
|
result.openBlobStreamImpl = fsOpenBlobStream
|
|
|
|
result.openIngestStreamImpl = fsOpenIngestStream
|
|
|
|
result.root = root
|
|
|
|
result.buf = ""
|