search term:

Gigahorse 0.2.0

Gigahorse 0.2.0 is now released. The new change is that it abstracts over two backends. @alexdupre contributed migration from AHC 1.9 to AHC 2.0, which is based on Netty 4 in #12.

In addition, there’s now an experimental Akka HTTP support that I added. #15

Please see Gigahorse docs for the details.

using functions

Continuing from Gigahorse 0.1.x, the bread and butter function for Gigahorse remains to be http.run(r, f), which transforms the response object to A:

val f = http.run(r, Gigahorse.asString andThen {_.take(60)})

What was called Response class is now renamed to FullResponse. A FullResponse represents a response that has already retrieved the entire body contents in-memory.

Async processing with Reactive Stream

When the content is relatively small, retreiving everything first might be ok, but for things like downloading files, we would want to process the content by chunks as we receive them.

scala> import gigahorse._, support.asynchttpclient.Gigahorse
scala> import scala.concurrent._, duration._
scala> import ExecutionContext.Implicits._
scala> import java.io.File

scala> Gigahorse.withHttp(Gigahorse.config) { http =>
         val file = new File(new File("target"), "Google_2015_logo.svg")
         val r = Gigahorse.url("https://upload.wikimedia.org/wikipedia/commons/2/2f/Google_2015_logo.svg")
         val f = http.download(r, file)
         Await.result(f, 120.seconds)
       }
res0: java.io.File = target/Google_2015_logo.svg

Thanks to Lightbend implementing Reactive Stream on both Akka HTTP and AHC #963, Gigahorse can abstract over both backends as Reactive Stream of byte or String stream. The stream processing is provided using http.runStream(r, f).

  /** Runs the request and return a Future of A. */
  def runStream[A](request: Request, f: StreamResponse => Future[A]): Future[A]

Note that the function takes a StreamResponse instead of a FullResponse. Unlike the FullResponse, it does not have the body contents received yet.

Instead, StreamResponse can create Stream[A] that will retrieve the parts on-demand. As a starting point, Gigahorse provides Gigahorse.asByteStream and Gigahorse.asStringStream.

import org.reactivestreams.Publisher
import scala.concurrent.Future

abstract class Stream[A] {
  /**
   * @return The underlying Stream object.
   */
  def underlying[A]

  def toPublisher: Publisher[A]

  /** Runs f on each element received to the stream. */
  def foreach(f: A => Unit): Future[Unit]

  /** Runs f on each element received to the stream with its previous output. */
  def fold[B](zero: B)(f: (B, A) => B): Future[B]

  /** Similar to fold but uses first element as zero element. */
  def reduce(f: (A, A) => A): Future[A]
}

Using this, we can process stream at relative ease. For example, download is implementing as follows:

  def download(request: Request, file: File): Future[File] =
    runStream(request, asFile(file))

....

import java.nio.ByteBuffer
import java.nio.charset.Charset
import java.io.{ File, FileOutputStream }
import scala.concurrent.Future

object DownloadHandler {
  /** Function from `StreamResponse` to `Future[File]` */
  def asFile(file: File): StreamResponse => Future[File] = (response: StreamResponse) =>
    {
      val stream = response.byteBuffers
      val out = new FileOutputStream(file).getChannel
      stream.fold(file)((acc, bb) => {
        out.write(bb)
        acc
      })
    }
}

stream.fold will write into the FileOutputStream as the parts arrive.

Newline delimited stream

Here’s another example, this time using Akka HTTP. Suppose we are running $ python -m SimpleHTTPServer 8000, which serves the current directory over port 8000, and let’s say we want to take README.markdown and print each line:

scala> import gigahorse._, support.akkahttp.Gigahorse
scala> import scala.concurrent._, duration._

scala> Gigahorse.withHttp(Gigahorse.config) { http =>
         val r = Gigahorse.url("http://localhost:8000/README.markdown").get
         val f = http.runStream(r, Gigahorse.asStringStream andThen { xs =>
           xs.foreach { s => println(s) }
         })
         Await.result(f, 120.seconds)
       }
Gigahorse
==========

Gigahorse is an HTTP client for Scala with Async Http Client or Lightbend Akka HTTP underneath.
....

It worked. This could be used for process an infinite stream of JSON.

about Reactive Streams

To learn about Reactive Streams, head over to Konrad’s Reactive Streams talk.