Improve error handling for HTTP server
This commit is contained in:
parent
88e634746f
commit
801d7c5ee0
|
@ -57,15 +57,19 @@ proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int =
|
|||
|
||||
proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
|
||||
var fs = FileStore(s)
|
||||
let path = fs.root / $kind / $id
|
||||
FsBlobStream(
|
||||
closeImpl: fsBlobClose,
|
||||
setPosImpl: setPosFs,
|
||||
getPosImpl: getPosFs,
|
||||
readImpl: fsBlobRead,
|
||||
path: path,
|
||||
file: openAsync(path, fmRead),
|
||||
)
|
||||
try:
|
||||
let
|
||||
path = fs.root / $kind / id.toHex
|
||||
file = openAsync(path, fmRead)
|
||||
result = FsBlobStream(
|
||||
closeImpl: fsBlobClose,
|
||||
setPosImpl: setPosFs,
|
||||
getPosImpl: getPosFs,
|
||||
readImpl: fsBlobRead,
|
||||
path: path, file: file,
|
||||
)
|
||||
except:
|
||||
raise newException(KeyError, "blob not in file-system store")
|
||||
|
||||
proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||
var s = FsIngestStream(s)
|
||||
|
@ -74,7 +78,7 @@ proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
|||
compressTree(s.leaves)
|
||||
result.id = s.leaves[0]
|
||||
result.size = s.pos
|
||||
moveFile(s.path, s.path.parentDir / $(result.id))
|
||||
moveFile(s.path, s.path.parentDir / result.id.toHex)
|
||||
|
||||
proc fsIngest(s: IngestStream; buf: pointer; len: Natural) =
|
||||
var
|
||||
|
@ -102,25 +106,27 @@ proc fsIngest(s: IngestStream; buf: pointer; len: Natural) =
|
|||
|
||||
proc fsOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream =
|
||||
var fs = FileStore(s)
|
||||
let stream = FsIngestStream()
|
||||
result = stream
|
||||
stream.finishImpl = fsFinish
|
||||
stream.ingestImpl = fsIngest
|
||||
case kind
|
||||
of dataBlob:
|
||||
stream.path = fs.root / "data" / "ingest"
|
||||
of metaBlob:
|
||||
stream.path = fs.root / "blob" / "ingest"
|
||||
stream.file = openAsync(stream.path, fmWrite)
|
||||
let stream = FsIngestStream(
|
||||
finishImpl: fsFinish,
|
||||
ingestImpl: fsIngest,
|
||||
path: fs.root / $kind / "ingest"
|
||||
)
|
||||
try: stream.file = openAsync(stream.path, fmWrite)
|
||||
except: raise newException(OSError,
|
||||
"failed to create ingest stream at '" & stream.path & "' " & getCurrentExceptionMsg())
|
||||
if size > 0:
|
||||
stream.file.setFileSize(size)
|
||||
stream.leaves = newSeqOfCap[BlobId](leafCount size)
|
||||
else:
|
||||
stream.leaves = newSeq[BlobId]()
|
||||
stream
|
||||
|
||||
proc newFileStore*(root = "/"): FileStore =
|
||||
createDir(root / "data")
|
||||
createDir(root / "blob")
|
||||
proc newFileStore*(root: string): FileStore =
|
||||
## Create a new store object backed by a file-system.
|
||||
try:
|
||||
createDir(root / $dataBlob)
|
||||
createDir(root / $metaBlob)
|
||||
except: discard
|
||||
new result
|
||||
result.openBlobStreamImpl = fsOpenBlobStream
|
||||
result.openIngestStreamImpl = fsOpenIngestStream
|
||||
|
|
|
@ -13,6 +13,7 @@ type
|
|||
rng: Rand
|
||||
|
||||
proc newHttpStoreServer*(backend: BlobStore): HttpStoreServer =
|
||||
## Create a new HTTP server for a given store.
|
||||
randomize()
|
||||
HttpStoreServer(
|
||||
server: newAsyncHttpServer(),
|
||||
|
@ -31,45 +32,44 @@ func parseRange(range: string): tuple[a: int, b: int] =
|
|||
if result.b < result.a:
|
||||
reset result
|
||||
|
||||
proc blobStream(hss: HttpStoreServer; path: string): BlobStream =
|
||||
let elems = path.split '/'
|
||||
if not elems.len == 3:
|
||||
raise newException(ValueError, "bad GET path " & path)
|
||||
let
|
||||
kind = case elems[1]
|
||||
of $dataBlob: dataBlob
|
||||
of $metaBlob: metaBlob
|
||||
else: raise newException(ValueError, "bad GET path " & path)
|
||||
blob = toBlobId elems[2]
|
||||
if hss.blobs.contains blob:
|
||||
result = hss.blobs[blob]
|
||||
else:
|
||||
result = hss.store.openBlobStream(blob, kind=kind)
|
||||
hss.blobs[blob] = result
|
||||
|
||||
proc head(hss: HttpStoreServer; req: Request): Future[void] =
|
||||
var stream: BlobStream
|
||||
let elems = req.url.path.split '/'
|
||||
if elems.len == 3:
|
||||
let
|
||||
kind = case elems[1]
|
||||
of "data": dataBlob
|
||||
of "meta": metaBlob
|
||||
else: raise newException(ValueError, "bad GET path " & req.url.path)
|
||||
blob = toBlobId elems[2]
|
||||
var stream = hss.blobs.getOrDefault blob
|
||||
if stream.isNil:
|
||||
stream = hss.store.openBlobStream(blob, kind=kind)
|
||||
hss.blobs[blob] = stream
|
||||
discard hss.blobStream(req.url.path)
|
||||
# cache the stream in the blob table or raise an exception
|
||||
req.respond(Http200, "")
|
||||
|
||||
proc get(hss: HttpStoreServer; req: Request): Future[void] =
|
||||
var stream: BlobStream
|
||||
let elems = req.url.path.split '/'
|
||||
if elems.len == 3:
|
||||
let stream = hss.blobStream(req.url.path)
|
||||
var
|
||||
pos: BiggestInt
|
||||
len = blobLeafSize
|
||||
let range = req.headers.getOrDefault "range"
|
||||
if range != "":
|
||||
let
|
||||
kind = case elems[1]
|
||||
of "data": dataBlob
|
||||
of "meta": metaBlob
|
||||
else: raise newException(ValueError, "bad GET path " & req.url.path)
|
||||
blob = toBlobId elems[2]
|
||||
var stream = hss.blobs.getOrDefault blob
|
||||
if stream.isNil:
|
||||
stream = hss.store.openBlobStream(blob, kind=kind)
|
||||
hss.blobs[blob] = stream
|
||||
let
|
||||
(pos, endPos) = parseRange req.headers["range"]
|
||||
len = endPos - pos
|
||||
stream.pos = pos
|
||||
var body = newString(len)
|
||||
let n = stream.read(body[0].addr, len)
|
||||
body.setLen n
|
||||
return req.respond(Http206, body)
|
||||
req.respond(Http404, "invalid request")
|
||||
(startPos, endPos) = parseRange range
|
||||
pos = startPos
|
||||
len = endPos - startPos
|
||||
stream.pos = pos
|
||||
var body = newString(len)
|
||||
len = stream.read(body[0].addr, len)
|
||||
body.setLen len
|
||||
let headers = newHttpHeaders({"Range": "bytes=$1-$2" % [ $pos, $(pos+len) ]})
|
||||
req.respond(Http206, body, headers)
|
||||
|
||||
proc putIngest(hss: HttpStoreServer; req: Request): Future[void] =
|
||||
let stream = hss.ingests[req.url.path]
|
||||
|
@ -78,14 +78,14 @@ proc putIngest(hss: HttpStoreServer; req: Request): Future[void] =
|
|||
|
||||
proc postIngest(hss: HttpStoreServer; req: Request): Future[void] =
|
||||
case req.url.path
|
||||
of "/ingest/data":
|
||||
of "/ingest/" & $dataBlob:
|
||||
let
|
||||
size = parseBiggestInt req.headers["ingest-size"]
|
||||
key = "/ingest/" & $hss.rng.next
|
||||
headers = newHttpHeaders({"Location": key})
|
||||
hss.ingests[key] = hss.store.openIngestStream(size, dataBlob)
|
||||
req.respond(Http201, "", headers)
|
||||
of "/ingest/meta":
|
||||
of "/ingest/" & $metaBlob:
|
||||
let
|
||||
size = parseBiggestInt req.headers["ingest-size"]
|
||||
key = "/ingest/" & $hss.rng.next
|
||||
|
@ -104,16 +104,24 @@ proc postIngest(hss: HttpStoreServer; req: Request): Future[void] =
|
|||
req.respond(Http204, "", headers)
|
||||
|
||||
proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] =
|
||||
## Serve requests to HTTP store.
|
||||
proc handleRequest(req: Request): Future[void] =
|
||||
case req.reqMethod
|
||||
of HttpGET:
|
||||
get(hss, req)
|
||||
of HttpHEAD:
|
||||
head(hss, req)
|
||||
of HttpPUT:
|
||||
putIngest(hss, req)
|
||||
of HttpPOST:
|
||||
postIngest(hss, req)
|
||||
else:
|
||||
req.respond(Http501, "method not implemented")
|
||||
try:
|
||||
case req.reqMethod
|
||||
of HttpGET:
|
||||
get(hss, req)
|
||||
of HttpHEAD:
|
||||
head(hss, req)
|
||||
of HttpPUT:
|
||||
putIngest(hss, req)
|
||||
of HttpPOST:
|
||||
postIngest(hss, req)
|
||||
else:
|
||||
req.respond(Http501, "method not implemented")
|
||||
except KeyError:
|
||||
req.respond(Http404, "blob not found")
|
||||
except ValueError:
|
||||
req.respond(Http400, getCurrentExceptionMsg())
|
||||
except:
|
||||
req.respond(Http500, getCurrentExceptionMsg())
|
||||
hss.server.serve(port, handleRequest)
|
||||
|
|
|
@ -57,8 +57,14 @@ proc httpOpenBlobStream(store: BlobStore; id: BlobId; size: BiggestInt; kind: Bl
|
|||
url: $((store.url / $kind) / id.toHex)
|
||||
)
|
||||
let resp = waitFor stream.client.request(stream.url, HttpHEAD)
|
||||
assert(resp.code in {Http200, Http204}, resp.status)
|
||||
stream
|
||||
case resp.code
|
||||
of Http200, Http204:
|
||||
stream
|
||||
of Http404:
|
||||
raise newException(KeyError, "blob not at HTTP store")
|
||||
else:
|
||||
raise newException(KeyError,
|
||||
"unrecognized " & $resp.code & " response from HTTP store: " & resp.status)
|
||||
|
||||
proc httpFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||
var s = HttpIngestStream(s)
|
||||
|
|
Loading…
Reference in New Issue