blobsets/src/blobsets/filestores.nim

135 lines
3.8 KiB
Nim
Raw Normal View History

2018-12-24 21:19:03 +01:00
import ../blobsets
import std/asyncfile, std/asyncdispatch, std/os
import nimcrypto/blake2
proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: BiggestInt] =
## Ingest a file and return blob metadata.
let
file = openAsync(path, fmRead)
fileSize = file.getFileSize
defer:
close file
let stream = store.openIngestStream(fileSize, dataBlob)
if fileSize > 0:
var buf = newString(min(blobLeafSize, fileSize))
while true:
let n = waitFor file.readBuffer(buf[0].addr, buf.len)
if n == 0: break
stream.ingest(buf[0].addr, n)
result = finish stream
type
FsBlobStream = ref FsBlobStreamObj
FsBlobStreamObj = object of BlobStreamObj
path: string
file: AsyncFile
FsIngestStream = ref FsIngestStreamObj
FsIngestStreamObj = object of IngestStreamObj
ctx: Blake2b256
leaves: seq[BlobId]
path: string
file: AsyncFile
pos, nodeOffset: BiggestInt
FileStore* = ref FileStoreObj
## A store that writes nodes and leafs as files.
FileStoreObj = object of BlobStoreObj
root, buf: string
proc fsBlobClose(s: BlobStream) =
var s = FsBlobStream(s)
close s.file
2019-01-20 17:14:32 +01:00
proc setPosFs(s: BlobStream; pos: BiggestInt) =
var s = FsBlobStream(s)
s.file.setFilePos (int64)pos
proc getPosFs(s: BlobStream): BiggestInt =
var s = FsBlobStream(s)
(BiggestInt)s.file.getFilePos
2018-12-24 21:19:03 +01:00
proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int =
var s = FsBlobStream(s)
result = waitFor s.file.readBuffer(buffer, len)
proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
var fs = FileStore(s)
2019-01-27 16:18:18 +01:00
try:
let
path = fs.root / $kind / id.toHex
file = openAsync(path, fmRead)
result = FsBlobStream(
closeImpl: fsBlobClose,
setPosImpl: setPosFs,
getPosImpl: getPosFs,
readImpl: fsBlobRead,
path: path, file: file,
)
except:
raise newException(KeyError, "blob not in file-system store")
2018-12-24 21:19:03 +01:00
proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
var s = FsIngestStream(s)
close s.file
s.leaves.add finish(s.ctx)
compressTree(s.leaves)
result.id = s.leaves[0]
result.size = s.pos
2019-01-27 16:18:18 +01:00
moveFile(s.path, s.path.parentDir / result.id.toHex)
2018-12-24 21:19:03 +01:00
proc fsIngest(s: IngestStream; buf: pointer; len: Natural) =
var
s = FsIngestStream(s)
off = 0
buf = cast[ptr array[blobLeafSize, byte]](buf)
while off < len:
var n = min(blobLeafSize, len-off)
let leafOff = int(s.pos and blobLeafSizeMask)
if leafOff == 0:
if s.pos > 0:
s.leaves.add finish(s.ctx)
s.ctx.init do (params: var Blake2bParams):
params.fanout = 2
params.depth = 255
params.leafLength = blobLeafSize
params.nodeOffset = s.nodeOffset
inc s.nodeOffset
else:
n = min(n, blobLeafSize-leafOff)
s.ctx.update(buf[off].addr, n)
waitFor s.file.writeBuffer(buf[off].addr, n)
off.inc n
s.pos.inc n
proc fsOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream =
var fs = FileStore(s)
2019-01-27 16:18:18 +01:00
let stream = FsIngestStream(
finishImpl: fsFinish,
ingestImpl: fsIngest,
path: fs.root / $kind / "ingest"
)
try: stream.file = openAsync(stream.path, fmWrite)
except: raise newException(OSError,
"failed to create ingest stream at '" & stream.path & "' " & getCurrentExceptionMsg())
2018-12-24 21:19:03 +01:00
if size > 0:
stream.file.setFileSize(size)
stream.leaves = newSeqOfCap[BlobId](leafCount size)
else:
stream.leaves = newSeq[BlobId]()
2019-01-27 16:18:18 +01:00
stream
2018-12-24 21:19:03 +01:00
2019-01-27 16:18:18 +01:00
proc newFileStore*(root: string): FileStore =
## Create a new store object backed by a file-system.
try:
createDir(root / $dataBlob)
createDir(root / $metaBlob)
except: discard
2018-12-24 21:19:03 +01:00
new result
result.openBlobStreamImpl = fsOpenBlobStream
result.openIngestStreamImpl = fsOpenIngestStream
result.root = root
result.buf = ""