Initial HTTP store
This commit is contained in:
parent
eac70a7dee
commit
219730dad8
|
@ -1,2 +1,3 @@
|
|||
/blobset
|
||||
/tests/test_set
|
||||
/tests/test_http
|
||||
|
|
|
@ -1,14 +1,15 @@
|
|||
when not isMainModule:
|
||||
{.error: "this module is not a library, import blobsets instead".}
|
||||
|
||||
import std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin
|
||||
import std/asyncdispatch, std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin
|
||||
import cbor
|
||||
import ./blobsets, ./blobsets/filestores
|
||||
import ./blobsets, ./blobsets/filestores,
|
||||
./blobsets/httpservers, ./blobsets/httpstores
|
||||
|
||||
# Basic Spry
|
||||
import spryvm/spryvm
|
||||
|
||||
# Spry extra modules, as much as possible!
|
||||
# Spry extra modules
|
||||
import spryvm/sprycore, spryvm/spryextend, spryvm/sprymath, spryvm/spryos, spryvm/spryio,
|
||||
spryvm/spryoo, spryvm/sprystring, spryvm/sprymodules, spryvm/spryreflect, spryvm/sprymemfile,
|
||||
spryvm/spryblock,
|
||||
|
@ -24,13 +25,23 @@ when defined(readLine):
|
|||
#else:
|
||||
# import ./blobsets/tcp
|
||||
|
||||
proc openStore(): BlobStore =
|
||||
#newFileStore("/tmp/blobs")
|
||||
newHttpStore("http://127.0.0.1:8080/")
|
||||
|
||||
proc serverMain() =
|
||||
let
|
||||
store = newFileStore("/tmp/blobs")
|
||||
server = newHttpStoreServer(store)
|
||||
waitFor server.serve((Port)8080)
|
||||
|
||||
proc dumpMain() =
|
||||
var args = newSeq[string]()
|
||||
for kind, key, val in getopt():
|
||||
if kind == cmdArgument:
|
||||
args.add key
|
||||
if args.len > 1:
|
||||
let store = newFileStore("/tmp/blobs")
|
||||
let store = openStore()
|
||||
for i in 1..args.high:
|
||||
try:
|
||||
for chunk in store.dumpBlob(args[i].toBlobId):
|
||||
|
@ -64,7 +75,7 @@ proc ingestMain() =
|
|||
args.add key
|
||||
if args.len > 1:
|
||||
var set = newBlobSet()
|
||||
let store = newFileStore("/tmp/blobs")
|
||||
let store = openStore()
|
||||
for i in 1..args.high:
|
||||
let path = normalizedPath args[i]
|
||||
set = store.insertPath(set, path.getFileInfo.kind, path)
|
||||
|
@ -674,7 +685,7 @@ proc replMain() =
|
|||
scripted = true
|
||||
let
|
||||
#store = openStore()
|
||||
store = newFileStore("/tmp/blobs")
|
||||
store = openStore()
|
||||
env = newEnv(store)
|
||||
outStream = stdout.newFileStream
|
||||
readLine = if scripted: readLineSimple else: readLineFromStdin
|
||||
|
@ -784,6 +795,7 @@ proc main() =
|
|||
of "repl": replMain()
|
||||
of "dump": dumpMain()
|
||||
of "ingest": ingestMain()
|
||||
of "server": serverMain()
|
||||
of "spry": spryMain()
|
||||
else: quit("no such subcommand ")
|
||||
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
# Disable this to use only primitive stdin
|
||||
-d:readLine
|
||||
--nilseqs:on
|
|
@ -394,9 +394,17 @@ type
|
|||
BlobKind* = enum
|
||||
dataBlob, metaBlob
|
||||
|
||||
proc `$`*(k: BlobKind): string =
|
||||
case k
|
||||
of dataBlob: "data"
|
||||
of metaBlob: "meta"
|
||||
|
||||
type
|
||||
BlobStream* = ref BlobStreamObj
|
||||
BlobStreamObj* = object of RootObj
|
||||
closeImpl*: proc (s: BlobStream) {.nimcall, gcsafe.}
|
||||
setPosImpl*: proc (s: BlobStream; pos: BiggestInt) {.nimcall, gcsafe.}
|
||||
getPosImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.}
|
||||
readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): int {.nimcall, gcsafe.}
|
||||
IngestStream* = ref IngestStreamObj
|
||||
IngestStreamObj* = object of RootObj
|
||||
|
@ -407,6 +415,14 @@ proc close*(s: BlobStream) =
|
|||
assert(not s.closeImpl.isNil)
|
||||
s.closeImpl(s)
|
||||
|
||||
proc `pos=`*(s: BlobStream; pos: BiggestInt) =
|
||||
assert(not s.setPosImpl.isNil)
|
||||
s.setPosImpl(s, pos)
|
||||
|
||||
proc pos*(s: BlobStream): BiggestInt =
|
||||
assert(not s.getPosImpl.isNil)
|
||||
s.getPosImpl(s)
|
||||
|
||||
proc read*(s: BlobStream; buf: pointer; len: Natural): int =
|
||||
assert(not s.readImpl.isNil)
|
||||
result = s.readImpl(s, buf, len)
|
||||
|
@ -426,6 +442,13 @@ proc ingest*(s: IngestStream; buf: var string) =
|
|||
assert(not s.ingestImpl.isNil)
|
||||
s.ingestImpl(s, buf[0].addr, buf.len)
|
||||
|
||||
proc ingest*(s: IngestStream; buf: string) {.deprecated.} =
|
||||
## Ingest stream
|
||||
# TODO: zero-copy
|
||||
assert(not s.ingestImpl.isNil)
|
||||
var buf = buf
|
||||
s.ingestImpl(s, buf[0].addr, buf.len)
|
||||
|
||||
type
|
||||
BlobStore* = ref BlobStoreObj
|
||||
BlobStoreObj* = object of RootObj
|
||||
|
@ -446,10 +469,17 @@ type
|
|||
|
||||
proc nullBlobClose(s: BlobStream) = discard
|
||||
|
||||
proc setPosNull(s: BlobStream; pos: BiggestInt) = discard
|
||||
proc getPosNull(s: BlobStream): BiggestInt = discard
|
||||
|
||||
proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = 0
|
||||
|
||||
proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
|
||||
BlobStream(closeImpl: nullBlobClose, readImpl: nullBlobRead)
|
||||
BlobStream(
|
||||
closeImpl: nullBlobClose,
|
||||
setPosImpl: setPosNull,
|
||||
getPosImpl: getPosNull,
|
||||
readImpl: nullBlobRead)
|
||||
|
||||
proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||
var s = NullIngestStream(s)
|
||||
|
|
|
@ -43,22 +43,29 @@ proc fsBlobClose(s: BlobStream) =
|
|||
var s = FsBlobStream(s)
|
||||
close s.file
|
||||
|
||||
proc setPosFs(s: BlobStream; pos: BiggestInt) =
|
||||
var s = FsBlobStream(s)
|
||||
s.file.setFilePos (int64)pos
|
||||
|
||||
proc getPosFs(s: BlobStream): BiggestInt =
|
||||
var s = FsBlobStream(s)
|
||||
(BiggestInt)s.file.getFilePos
|
||||
|
||||
proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int =
|
||||
var s = FsBlobStream(s)
|
||||
result = waitFor s.file.readBuffer(buffer, len)
|
||||
|
||||
proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
|
||||
var fs = FileStore(s)
|
||||
let stream = FsBlobStream()
|
||||
result = stream
|
||||
case kind
|
||||
of dataBlob:
|
||||
stream.path = fs.root / "data" / $id
|
||||
of metaBlob:
|
||||
stream.path = fs.root / "blob" / $id
|
||||
stream.file = openAsync(stream.path, fmRead)
|
||||
stream.closeImpl = fsBlobClose
|
||||
stream.readImpl = fsBlobRead
|
||||
let path = fs.root / $kind / $id
|
||||
FsBlobStream(
|
||||
closeImpl: fsBlobClose,
|
||||
setPosImpl: setPosFs,
|
||||
getPosImpl: getPosFs,
|
||||
readImpl: fsBlobRead,
|
||||
path: path,
|
||||
file: openAsync(path, fmRead),
|
||||
)
|
||||
|
||||
proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||
var s = FsIngestStream(s)
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
import std/asyncdispatch, std/asynchttpserver, std/parseutils, std/random, std/net, std/strutils, std/tables
|
||||
import ../blobsets
|
||||
|
||||
export net.Port
|
||||
|
||||
type
|
||||
HttpStoreServer = ref object
|
||||
server: AsyncHttpServer
|
||||
store: BlobStore
|
||||
ingests: Table[string, IngestStream]
|
||||
blobs: Table[BlobId, BlobStream]
|
||||
# TODO: tables must be purged periodically
|
||||
rng: Rand
|
||||
|
||||
proc newHttpStoreServer*(backend: BlobStore): HttpStoreServer =
|
||||
randomize()
|
||||
HttpStoreServer(
|
||||
server: newAsyncHttpServer(),
|
||||
store: backend,
|
||||
ingests: initTable[string, IngestStream](),
|
||||
blobs: initTable[BlobId, BlobStream](),
|
||||
rng: initRand(random(high int)))
|
||||
|
||||
func parseRange(range: string): tuple[a: int, b: int] =
|
||||
## Parse an HTTP byte range string.
|
||||
var start = skip(range, "bytes=")
|
||||
if start > 0:
|
||||
start.inc parseInt(range, result.a, start)
|
||||
if skipWhile(range, {'-'}, start) == 1:
|
||||
discard parseInt(range, result.b, start+1)
|
||||
if result.b < result.a:
|
||||
reset result
|
||||
|
||||
proc head(hss: HttpStoreServer; req: Request): Future[void] =
|
||||
var stream: BlobStream
|
||||
let elems = req.url.path.split '/'
|
||||
if elems.len == 3:
|
||||
let
|
||||
kind = case elems[1]
|
||||
of "data": dataBlob
|
||||
of "meta": metaBlob
|
||||
else: raise newException(ValueError, "bad GET path " & req.url.path)
|
||||
blob = toBlobId elems[2]
|
||||
var stream = hss.blobs.getOrDefault blob
|
||||
if stream.isNil:
|
||||
stream = hss.store.openBlobStream(blob, kind=kind)
|
||||
hss.blobs[blob] = stream
|
||||
req.respond(Http200, "")
|
||||
|
||||
proc get(hss: HttpStoreServer; req: Request): Future[void] =
|
||||
var stream: BlobStream
|
||||
let elems = req.url.path.split '/'
|
||||
if elems.len == 3:
|
||||
let
|
||||
kind = case elems[1]
|
||||
of "data": dataBlob
|
||||
of "meta": metaBlob
|
||||
else: raise newException(ValueError, "bad GET path " & req.url.path)
|
||||
blob = toBlobId elems[2]
|
||||
var stream = hss.blobs.getOrDefault blob
|
||||
if stream.isNil:
|
||||
stream = hss.store.openBlobStream(blob, kind=kind)
|
||||
hss.blobs[blob] = stream
|
||||
let
|
||||
(pos, endPos) = parseRange req.headers["range"]
|
||||
len = endPos - pos
|
||||
stream.pos = pos
|
||||
var body = newString(len)
|
||||
let n = stream.read(body[0].addr, len)
|
||||
body.setLen n
|
||||
return req.respond(Http206, body)
|
||||
req.respond(Http404, "invalid request")
|
||||
|
||||
proc putIngest(hss: HttpStoreServer; req: Request): Future[void] =
|
||||
let stream = hss.ingests[req.url.path]
|
||||
stream.ingest(req.body)
|
||||
req.respond(Http204, "")
|
||||
|
||||
proc postIngest(hss: HttpStoreServer; req: Request): Future[void] =
|
||||
case req.url.path
|
||||
of "/ingest/data":
|
||||
let
|
||||
size = parseBiggestInt req.headers["ingest-size"]
|
||||
key = "/ingest/" & $hss.rng.next
|
||||
headers = newHttpHeaders({"Location": key})
|
||||
hss.ingests[key] = hss.store.openIngestStream(size, dataBlob)
|
||||
req.respond(Http201, "", headers)
|
||||
of "/ingest/meta":
|
||||
let
|
||||
size = parseBiggestInt req.headers["ingest-size"]
|
||||
key = "/ingest/" & $hss.rng.next
|
||||
headers = newHttpHeaders({"Location": key})
|
||||
hss.ingests[key] = hss.store.openIngestStream(size, metaBlob)
|
||||
req.respond(Http201, "", headers)
|
||||
else:
|
||||
let
|
||||
stream = hss.ingests[req.url.path]
|
||||
(blob, size) = finish stream
|
||||
headers = newHttpHeaders({
|
||||
"blob-id": blob.toHex,
|
||||
"blob-size": $size,
|
||||
})
|
||||
hss.ingests.del req.url.path
|
||||
req.respond(Http204, "", headers)
|
||||
|
||||
proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] =
|
||||
proc handleRequest(req: Request): Future[void] =
|
||||
case req.reqMethod
|
||||
of HttpGET:
|
||||
get(hss, req)
|
||||
of HttpHEAD:
|
||||
head(hss, req)
|
||||
of HttpPUT:
|
||||
putIngest(hss, req)
|
||||
of HttpPOST:
|
||||
postIngest(hss, req)
|
||||
else:
|
||||
req.respond(Http501, "method not implemented")
|
||||
hss.server.serve(port, handleRequest)
|
|
@ -0,0 +1,108 @@
|
|||
import std/asyncdispatch, std/httpclient, std/strutils, std/uri
|
||||
import ../blobsets
|
||||
|
||||
type
|
||||
HttpBlobStream = ref HttpBlobStreamObj
|
||||
HttpBlobStreamObj = object of BlobStreamObj
|
||||
client: AsyncHttpClient
|
||||
url: string
|
||||
rangePos: BiggestInt
|
||||
|
||||
HttpIngestStream = ref HttpIngestStreamObj
|
||||
HttpIngestStreamObj = object of IngestStreamObj
|
||||
client: AsyncHttpClient
|
||||
url: string
|
||||
ctx: Blake2b256
|
||||
leaves: seq[BlobId]
|
||||
leaf: string
|
||||
buffOff: int
|
||||
pos, nodeOffset: BiggestInt
|
||||
|
||||
HttpStore* = ref HttpStoreObj
|
||||
HttpStoreObj = object of BlobStoreObj
|
||||
url: Uri
|
||||
|
||||
proc httpBlobClose(s: BlobStream) =
|
||||
var s = (HttpBlobStream)s
|
||||
close s.client
|
||||
|
||||
proc setPosHttp(s: BlobStream; pos: BiggestInt) =
|
||||
var s = (HttpBlobStream)s
|
||||
s.rangePos = pos
|
||||
|
||||
proc getPosHttp(s: BlobStream): BiggestInt =
|
||||
var s = (HttpBlobStream)s
|
||||
s.rangePos
|
||||
|
||||
proc httpBlobRead(s: BlobStream; buffer: pointer; len: Natural): int =
|
||||
assert(not buffer.isNil)
|
||||
var s = (HttpBlobStream)s
|
||||
let
|
||||
headers = newHttpHeaders({"range": "bytes=$1-$2" % [ $s.rangePos, $(s.rangePos+len) ]})
|
||||
resp = waitFor s.client.request(s.url, HttpGET, headers=headers)
|
||||
var body = waitFor resp.body
|
||||
assert(not body.isNil)
|
||||
result = (int)min(body.len, len)
|
||||
if result > 0:
|
||||
copyMem(buffer, body[0].addr, result)
|
||||
s.rangePos.inc result
|
||||
|
||||
proc httpOpenBlobStream(store: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
|
||||
var store = HttpStore(store)
|
||||
let stream = HttpBlobStream(
|
||||
client: newAsyncHttpClient(),
|
||||
closeImpl: httpBlobClose,
|
||||
setPosImpl: setPosHttp,
|
||||
getPosImpl: getPosHttp,
|
||||
readImpl: httpBlobRead,
|
||||
url: $((store.url / $kind) / id.toHex)
|
||||
)
|
||||
let resp = waitFor stream.client.head(stream.url)
|
||||
assert(resp.code in {Http200, Http204}, resp.status)
|
||||
stream
|
||||
|
||||
proc httpFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||
var s = HttpIngestStream(s)
|
||||
let resp = waitFor s.client.request($s.url, HttpPOST)
|
||||
if not resp.code.is2xx:
|
||||
raiseAssert(resp.status)
|
||||
result.id = toBlobId resp.headers["blob-id"]
|
||||
result.size = parseBiggestInt resp.headers["blob-size"]
|
||||
|
||||
proc httpIngest(x: IngestStream; buf: pointer; len: Natural) =
|
||||
var
|
||||
s = HttpIngestStream(x)
|
||||
body = newString(len)
|
||||
copyMem(body[0].addr, buf, body.len)
|
||||
# TODO: zero-copy
|
||||
let resp = waitFor s.client.request($s.url, HttpPUT, body)
|
||||
if resp.code != Http204:
|
||||
raiseAssert(resp.status)
|
||||
|
||||
proc httpOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream =
|
||||
var store = HttpStore(s)
|
||||
let
|
||||
client = newAsyncHttpClient()
|
||||
url = (store.url / "ingest") / $kind
|
||||
headers = newHttpHeaders({"ingest-size": $size})
|
||||
resp = waitFor client.request($url, HttpPOST, headers=headers)
|
||||
if resp.code == Http201:
|
||||
var ingestUrl = parseUri resp.headers["Location"]
|
||||
if not ingestUrl.isAbsolute:
|
||||
ingestUrl = combine(store.url, ingestUrl)
|
||||
result = HttpIngestStream(
|
||||
finishImpl: httpFinish,
|
||||
ingestImpl: httpIngest,
|
||||
client: client,
|
||||
url: $ingestUrl,
|
||||
leaf: newString(min(size.int, blobLeafSize)),
|
||||
)
|
||||
else:
|
||||
raiseAssert(resp.status)
|
||||
|
||||
proc newHttpStore*(url: string): HttpStore =
|
||||
HttpStore(
|
||||
url: parseUri url,
|
||||
openBlobStreamImpl: httpOpenBlobStream,
|
||||
openIngestStreamImpl: httpOpenIngestStream,
|
||||
)
|
|
@ -0,0 +1,26 @@
|
|||
import std/asyncdispatch, std/net, std/random, std/strutils, std/unittest
|
||||
|
||||
import ../src/blobsets, ../src/blobsets/filestores, ../src/blobsets/httpstores, ../src/blobsets/httpservers
|
||||
|
||||
suite "Http store":
|
||||
randomize()
|
||||
|
||||
let
|
||||
port = (Port)rand(1 shl 15)
|
||||
store = newNullStore()
|
||||
server = newHttpStoreServer(store)
|
||||
asyncCheck server.serve(port)
|
||||
|
||||
let
|
||||
url = "http://127.0.0.1:$1/" % $port
|
||||
client = newHttpStore url
|
||||
|
||||
var
|
||||
blob: BlobId
|
||||
size: BiggestInt
|
||||
|
||||
test "ingest":
|
||||
(blob, size) = client.ingestFile("tests/test_http.nim")
|
||||
test "dump":
|
||||
for chunk in store.dumpBlob(blob):
|
||||
echo chunk
|
Loading…
Reference in New Issue