Skip to content

Commit

Permalink
writer: Add WARCWriter as discussed in datatogether#13
Browse files Browse the repository at this point in the history
  • Loading branch information
riking committed Mar 3, 2018
1 parent 707c1f7 commit 9347f4e
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 8 deletions.
17 changes: 9 additions & 8 deletions records.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package warc

// Records is a slice of records
// A WARC format file is the simple concatenation of one or more WARC
// records. The first record usually describes the records to follow. In
// general, record content is either the direct result of a retrieval
// attempt — web pages, inline images, URL redirection information, DNS
// hostname lookup results, standalone files, etc. — or is synthesized
// material (e.g., metadata, transformed content) that provides additional
// information about archived content.
// Records provides utility functions for slices of records.
//
// A WARC format file is the simple concatenation of one or more WARC records.
// The first record usually describes the records to follow. In general,
// record content is either the direct result of a retrieval attempt — web
// pages, inline images, URL redirection information, DNS hostname lookup
// results, standalone files, etc. — or is synthesized material (e.g.,
// metadata, transformed content) that provides additional information about
// archived content.
type Records []*Record

// FilterTypes return all record types that match a provide
Expand Down
169 changes: 169 additions & 0 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,189 @@ package warc

import (
"bytes"
"compress/gzip"
stdErrors "errors"
"fmt"
"io"
"net/http"
"sort"

"github.com/pborman/uuid"
"github.com/pkg/errors"
)

// NewUUID generates a new version 4 uuid
func NewUUID() string {
return fmt.Sprintf("<urn:uuid:%s>", uuid.New())
}

type flusher interface {
Flush() error
}

// Writer provides functionality for writing WARC files in compressed and
// uncompressed formats.
//
// To construct a Writer, call NewWriterCompressed or NewWriterRaw.
type Writer struct {
seekW io.WriteSeeker
wr io.Writer

// RecordCallback will be called after each record is written to the file.
// If a WriteSeeker was not provided, the provided positions will be
// invalid.
RecordCallback func(r *Record, startPos, endPos int64)
}

// NewWriterCompressed initializes a WARC Writer writing to a compressed
// stream. The first parameter should be the "backing stream" of the
// compression. The second parameter is a compress/gzip writer writing to the
// rawFile parameter.
//
// Seek will only be called with whence == io.SeekCurrent and offset == 0.
//
// See also CountWriter() if you need a "fake" Seek implementation.
func NewWriterCompressed(rawFile io.WriteSeeker, cmprsWriter *gzip.Writer) (*Writer, error) {
w := &Writer{
seekW: rawFile,
wr: cmprsWriter,
}
return w, nil
}

// NewWriterRaw initializes a WARC Writer writing to an uncompressed stream.
// If the provided Writer implements io.Seeker, the RecordCallback will be
// available. If the provided Writer implements interface{Flush() error}, it
// will be flushed after every written Record.
//
// See also CountWriter() if you need a "fake" Seek implementation.
func NewWriterRaw(out io.Writer) (*Writer, error) {
w := &Writer{
wr: out,
}
if wseeker, ok := out.(io.WriteSeeker); ok {
w.seekW = wseeker
}
return w, nil
}

type countWriter struct {
count int64
w io.Writer
}

// CountWriter implements a limited version of io.Seeker around the provided
// Writer. It only supports offset == 0 and whence == io.SeekCurrent or
// io.SeekEnd, and returns the current number of written bytes in both cases.
func CountWriter(w io.Writer) io.WriteSeeker {
return &countWriter{count: 0, w: w}
}

// implements io.Writer
func (c *countWriter) Write(p []byte) (int, error) {
n, err := c.w.Write(p)
if n >= 0 {
c.count += int64(n)
}
return n, err
}

var errCountWriterNotImplemented = stdErrors.New("unsupported seek operation")

// implements io.Seeker
func (c *countWriter) Seek(offset int64, whence int) (int64, error) {
if offset != 0 || !(whence == io.SeekCurrent || whence == io.SeekEnd) {
return 0, errCountWriterNotImplemented
}
return c.count, nil
}

func (w *Writer) WriteRecord(r *Record) error {
var startPos, endPos int64
var err2 error
var errs []error

if w.seekW != nil {
startPos, err2 = w.seekW.Seek(0, io.SeekCurrent)
if err2 != nil {
errs = append(errs, errors.Wrap(err2, "warc writer: seek 0"))
}
}

// don't return quite yet - still need to do RecordCallback
err2 = r.Write(w.wr)
if err2 != nil {
errs = append(errs, errors.Wrap(err2, "warc writer: write record"))
}

if gzW, ok := w.wr.(*gzip.Writer); ok {
// flush is not sufficient, need to Close/Reset
err2 = gzW.Close()
if err2 != nil {
errs = append(errs, errors.Wrap(err2, "warc writer: flush"))
}
gzW.Reset(w.seekW)
}
if w.seekW != nil {
endPos, err2 = w.seekW.Seek(0, io.SeekCurrent)
if err2 != nil {
errs = append(errs, errors.Wrap(err2, "warc writer: seek 0"))
}
}

if w.RecordCallback != nil {
w.RecordCallback(r, startPos, endPos)
}

if len(errs) == 1 {
return errs[0]
} else if len(errs) > 1 {
return simpleMultiError(errs)
}
return nil
}

// Close cleans up any resources the warc.Writer might be holding on to.
func (w *Writer) Close() error {
return nil
}

type simpleMultiError []error

func (m simpleMultiError) Cause() error {
return m[0]
}

func (m simpleMultiError) Error() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%s\nand %d other errors:\n", m[0], len(m)-1)
for i := 1; i < len(m); i++ {
fmt.Fprintln(&buf, m[i])
}
return buf.String()
}

func (m simpleMultiError) Format(s fmt.State, verb rune) {
switch verb {
case 'v':
if s.Flag('+') {
fmt.Fprintf(s, "%d different errors\n", len(m))
for i := 0; i < len(m); i++ {
fmt.Fprintf(s, "%+v\n", m[i])
}
return
}
fallthrough
case 's', 'q':
fmt.Fprintf(s, "%d different errors\n", len(m))
for i := 0; i < len(m); i++ {
fmt.Fprintf(s, "%s\n", m[i])
}
}
}

// WriteRecords calls Write on each record to w.
// Deprecated: see Writer type
func WriteRecords(w io.Writer, records Records) error {
for _, rec := range records {
if err := rec.Write(w); err != nil {
Expand Down

0 comments on commit 9347f4e

Please sign in to comment.