Skip to content

Commit

Permalink
Merge pull request #72 from ironsmile/feature/mp4Handler
Browse files Browse the repository at this point in the history
Feature/mp4 handler - closes #16
  • Loading branch information
ironsmile committed Sep 17, 2015
2 parents f6a9481 + f542a65 commit 463a4e5
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 103 deletions.
20 changes: 10 additions & 10 deletions handler/cache/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (h *reqHandler) carbonCopyProxy() {
}

func (h *reqHandler) knownRanged() {
ranges, err := parseReqRange(h.req.Header.Get("Range"), h.obj.Size)
ranges, err := utils.ParseReqRange(h.req.Header.Get("Range"), h.obj.Size)
if err != nil {
err := http.StatusRequestedRangeNotSatisfiable
http.Error(h.resp, http.StatusText(err), err)
Expand All @@ -92,23 +92,23 @@ func (h *reqHandler) knownRanged() {
}
reqRange := ranges[0]

copyHeadersWithout(h.obj.Headers, h.resp.Header())
h.resp.Header().Set("Content-Range", reqRange.contentRange(h.obj.Size))
h.resp.Header().Set("Content-Length", strconv.FormatUint(reqRange.length, 10))
utils.CopyHeadersWithout(h.obj.Headers, h.resp.Header())
h.resp.Header().Set("Content-Range", reqRange.ContentRange(h.obj.Size))
h.resp.Header().Set("Content-Length", strconv.FormatUint(reqRange.Length, 10))
h.resp.WriteHeader(http.StatusPartialContent)

end := ranges[0].start + ranges[0].length - 1
reader := h.getSmartReader(ranges[0].start, end)
end := ranges[0].Start + ranges[0].Length - 1
reader := h.getSmartReader(ranges[0].Start, end)
if copied, err := io.Copy(h.resp, reader); err != nil {
h.Logger.Errorf("[%p] Error copying response: %s. Copied %d out of %d bytes", h.req, err, copied, reqRange.length)
} else if uint64(copied) != reqRange.length {
h.Logger.Errorf("[%p] Error copying response. Expected to copy %d bytes, copied %d", h.req, reqRange.length, copied)
h.Logger.Errorf("[%p] Error copying response: %s. Copied %d out of %d bytes", h.req, err, copied, reqRange.Length)
} else if uint64(copied) != reqRange.Length {
h.Logger.Errorf("[%p] Error copying response. Expected to copy %d bytes, copied %d", h.req, reqRange.Length, copied)
}
reader.Close()
}

func (h *reqHandler) knownFull() {
copyHeadersWithout(h.obj.Headers, h.resp.Header())
utils.CopyHeadersWithout(h.obj.Headers, h.resp.Header())
h.resp.Header().Set("Content-Length", strconv.FormatUint(h.obj.Size, 10))
h.resp.WriteHeader(h.obj.Code)

Expand Down
46 changes: 15 additions & 31 deletions handler/cache/handler_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strconv"
"time"

"github.com/ironsmile/nedomi/storage"
"github.com/ironsmile/nedomi/types"
"github.com/ironsmile/nedomi/utils"
)
Expand All @@ -29,21 +28,6 @@ var hopHeaders = []string{

var metadataHeadersToFilter = append(hopHeaders, "Content-Length", "Content-Range")

func copyHeadersWithout(from, to http.Header, exceptions ...string) {
for k := range from {
shouldCopy := true
for _, e := range exceptions {
if e == k {
shouldCopy = false
break
}
}
if shouldCopy {
to[k] = from[k]
}
}
}

// Returns a new HTTP 1.1 request that has no body. It also clears headers like
// accept-encoding and rearranges the requested ranges so they match part
func (h *reqHandler) getNormalizedRequest() *http.Request {
Expand All @@ -58,19 +42,19 @@ func (h *reqHandler) getNormalizedRequest() *http.Request {
Host: h.req.URL.Host,
}

copyHeadersWithout(h.req.Header, result.Header, "Accept-Encoding")
utils.CopyHeadersWithout(h.req.Header, result.Header, "Accept-Encoding")

//!TODO: fix requested range to be divisible by the storage partSize

return result
}

func (h *reqHandler) getDimensions(code int, headers http.Header) (*httpContentRange, error) {
func (h *reqHandler) getDimensions(code int, headers http.Header) (*utils.HTTPContentRange, error) {
rangeStr := headers.Get("Content-Range")
lengthStr := headers.Get("Content-Length")
if code == http.StatusPartialContent {
if rangeStr != "" {
return parseRespContentRange(rangeStr)
return utils.ParseRespContentRange(rangeStr)
}
return nil, errors.New("No Content-Range header")
} else if code == http.StatusOK {
Expand All @@ -79,7 +63,7 @@ func (h *reqHandler) getDimensions(code int, headers http.Header) (*httpContentR
if err != nil {
return nil, err
}
return &httpContentRange{start: 0, length: size, objSize: size}, nil
return &utils.HTTPContentRange{Start: 0, Length: size, ObjSize: size}, nil
}
return nil, errors.New("No Content-Length header")
}
Expand All @@ -90,15 +74,15 @@ func (h *reqHandler) getResponseHook() func(*utils.FlexibleResponseWriter) {

return func(rw *utils.FlexibleResponseWriter) {
h.Logger.Debugf("[%p] Received headers for %s, sending them to client...", h.req, h.req.URL)
copyHeadersWithout(rw.Headers, h.resp.Header(), hopHeaders...)
utils.CopyHeadersWithout(rw.Headers, h.resp.Header(), hopHeaders...)
h.resp.WriteHeader(rw.Code)

//!TODO: handle duration
isCacheable, _ := utils.IsResponseCacheable(rw.Code, rw.Headers)
dims, err := h.getDimensions(rw.Code, rw.Headers)
if !isCacheable || err != nil {
h.Logger.Debugf("[%p] Response is non-cacheable (%s) :(", h.req, err)
rw.BodyWriter = storage.NopCloser(h.resp)
rw.BodyWriter = utils.NopCloser(h.resp)
return
}

Expand All @@ -109,24 +93,24 @@ func (h *reqHandler) getResponseHook() func(*utils.FlexibleResponseWriter) {
ID: h.objID,
ResponseTimestamp: time.Now().Unix(),
Code: rw.Code,
Size: dims.objSize,
Size: dims.ObjSize,
Headers: make(http.Header),
}
copyHeadersWithout(rw.Headers, obj.Headers, metadataHeadersToFilter...)
utils.CopyHeadersWithout(rw.Headers, obj.Headers, metadataHeadersToFilter...)

//!TODO: optimize this, save the metadata only when it's newer
//!TODO: also, error if we already have fresh metadata but the received metadata is different
if err := h.Cache.Storage.SaveMetadata(obj); err != nil {
h.Logger.Errorf("Could not save metadata for %s: %s", obj.ID, err)
rw.BodyWriter = storage.NopCloser(h.resp)
rw.BodyWriter = utils.NopCloser(h.resp)
return
}
}

//!TODO: handle range requests
rw.BodyWriter = storage.MultiWriteCloser(
storage.NopCloser(h.resp),
storage.PartWriter(h.Cache, h.objID, dims.start, dims.length),
rw.BodyWriter = utils.MultiWriteCloser(
utils.NopCloser(h.resp),
utils.PartWriter(h.Cache, h.objID, dims.Start, dims.Length),
)
}
}
Expand Down Expand Up @@ -201,10 +185,10 @@ func (h *reqHandler) getSmartReader(start, end uint64) io.ReadCloser {

// work in start and end
var startOffset, endLimit = start % partSize, end%partSize + 1
readers[0] = storage.SkipReadCloser(readers[0], int(startOffset))
readers[len(readers)-1] = storage.LimitReadCloser(readers[len(readers)-1], int(endLimit))
readers[0] = utils.SkipReadCloser(readers[0], int(startOffset))
readers[len(readers)-1] = utils.LimitReadCloser(readers[len(readers)-1], int(endLimit))

h.Logger.Debugf("[%p] Return smart reader for %s with %d out of %d parts from storage!",
h.req, h.objID, localCount, len(indexes))
return storage.MultiReadCloser(readers...)
return utils.MultiReadCloser(readers...)
}
102 changes: 102 additions & 0 deletions handler/mp4/mp4.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package mp4

import (
"fmt"
"net/http"
"net/url"
"strconv"
"time"

"golang.org/x/net/context"

"github.com/MStoykov/mp4"
"github.com/MStoykov/mp4/clip"

"github.com/ironsmile/nedomi/config"
"github.com/ironsmile/nedomi/types"
"github.com/ironsmile/nedomi/utils"
)

const (
startKey = "start"
firstRequestSize = 4096
)

var errUnsatisfactoryResponse = fmt.Errorf("unsatisfactory response from the next handler")

// New creates and returns a ready to used ServerStatusHandler.
func New(cfg *config.Handler, l *types.Location, next types.RequestHandler) (types.RequestHandler, error) {
// !TODO parse config
return &mp4Handler{
next: next,
logger: l.Logger,
}, nil
}

type mp4Handler struct {
next types.RequestHandler
logger types.Logger
}

func copyRequest(r *http.Request) *http.Request {
req := *r
req.Header = http.Header{}
url := *r.URL
req.URL = &url
utils.CopyHeadersWithout(r.Header, req.Header)
return &req
}

func removeQueryArgument(u *url.URL, arguments ...string) {
query := u.Query()
for _, argument := range arguments {
query.Del(argument)
}
u.RawQuery = query.Encode()
}

func (m *mp4Handler) RequestHandle(ctx context.Context, w http.ResponseWriter, r *http.Request, l *types.Location) {
// Handle only GET requests with ContentLength of 0 without a Range header
if r.Method != "GET" || len(r.Header.Get("Range")) > 0 || r.ContentLength > 0 {
m.next.RequestHandle(ctx, w, r, l)
return
}

// parse the request
var start, err = strconv.Atoi(r.URL.Query().Get(startKey))
if err != nil || 0 >= start { // that start is not ok
m.next.RequestHandle(ctx, w, r, l)
return
}
var startTime = time.Duration(start) * time.Second
var newreq = copyRequest(r)
removeQueryArgument(newreq.URL, startKey)

var rr = &rangeReader{ctx: ctx, req: copyRequest(r), location: l, next: m.next}
var video *mp4.MP4
video, err = mp4.Decode(rr)
if err != nil {
m.logger.Errorf("error from the mp4.Decode - %s", err)
m.next.RequestHandle(ctx, w, r, l)
return
}
if video == nil || video.Moov == nil { // missing something?
m.next.RequestHandle(ctx, w, r, l)
return
}

cl, err := clip.New(video, startTime, rr)
if err != nil {
m.logger.Errorf("error while clipping a video(%+v) - %s", video, err)
m.next.RequestHandle(ctx, w, r, l)
return
}
w.Header().Set("Content-Type", "video/mp4") // copy it from next
w.Header().Set("Content-Length", strconv.FormatUint(cl.Size(), 10))
w.WriteHeader(http.StatusOK)
size, err := cl.WriteTo(w)
m.logger.Debugf("wrote %d", size)
if err != nil {
m.logger.Errorf("error on writing the clip response - %s", err)
}
}
40 changes: 40 additions & 0 deletions handler/mp4/range_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package mp4

import (
"io"
"net/http"

"golang.org/x/net/context"

"github.com/ironsmile/nedomi/types"
"github.com/ironsmile/nedomi/utils"
)

type rangeReader struct {
ctx context.Context
req *http.Request
location *types.Location
next types.RequestHandler
}

func (rr *rangeReader) Range(start, length uint64) io.ReadCloser {
newreq := copyRequest(rr.req)
newreq.Header.Set("Range", utils.HTTPRange{Start: start, Length: length}.Range())
var in, out = io.Pipe()
flexible := utils.NewFlexibleResponseWriter(func(frw *utils.FlexibleResponseWriter) {
if frw.Code != http.StatusPartialContent {
out.CloseWithError(errUnsatisfactoryResponse)
}
frw.BodyWriter = out
})
go func() {
defer out.Close()
rr.next.RequestHandle(rr.ctx, flexible, newreq, rr.location)
}()

return in
}

func (rr *rangeReader) RangeRead(start, length uint64) (io.ReadCloser, error) {
return rr.Range(start, length), nil
}
5 changes: 5 additions & 0 deletions handler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ironsmile/nedomi/handler/cache"
"github.com/ironsmile/nedomi/handler/dir"
"github.com/ironsmile/nedomi/handler/flv"
"github.com/ironsmile/nedomi/handler/mp4"
"github.com/ironsmile/nedomi/handler/status"
"github.com/ironsmile/nedomi/handler/throttle"
"github.com/ironsmile/nedomi/handler/via"
Expand All @@ -32,6 +33,10 @@ var handlerTypes = map[string]newHandlerFunc{
return flv.New(cfg, l, next)
},

"mp4": func(cfg *config.Handler, l *types.Location, next types.RequestHandler) (types.RequestHandler, error) {
return mp4.New(cfg, l, next)
},

"status": func(cfg *config.Handler, l *types.Location, next types.RequestHandler) (types.RequestHandler, error) {
return status.New(cfg, l, next)
},
Expand Down
3 changes: 1 addition & 2 deletions utils/flexible_response_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/http"
"testing"

"github.com/ironsmile/nedomi/storage"
"github.com/ironsmile/nedomi/types"
"github.com/ironsmile/nedomi/upstream"
"github.com/ironsmile/nedomi/utils"
Expand All @@ -25,7 +24,7 @@ func testHandler(t *testing.T, u types.Upstream, path, expRespBody string, expRe
if frw.Code != expRespCode {
t.Errorf("Expected response code %d for %s but received %d", expRespCode, path, frw.Code)
}
frw.BodyWriter = storage.NopCloser(buf)
frw.BodyWriter = utils.NopCloser(buf)
}

resp := utils.NewFlexibleResponseWriter(hook)
Expand Down
19 changes: 19 additions & 0 deletions utils/headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package utils

import "net/http"

// CopyHeadersWithout copies headers from `from` to `to` except for the `exceptions`
func CopyHeadersWithout(from, to http.Header, exceptions ...string) {
for k := range from {
shouldCopy := true
for _, e := range exceptions {
if e == k {
shouldCopy = false
break
}
}
if shouldCopy {
to[k] = from[k]
}
}
}
Loading

0 comments on commit 463a4e5

Please sign in to comment.