Gigahorse は、Scala のための HTTP クライアントで複数のバックエンド実装をサポートする。 内部実装として Apache HTTP HttpAsyncClient か Async Http Client か Square OkHttp か Akka HTTP から選ぶことができる。
Apache HTTP HttpAsyncClient を使う場合:
libraryDependencies += "com.eed3si9n" %% "gigahorse-apache-http" % "0.7.0"
Async HTTP Client を使う場合:
libraryDependencies += "com.eed3si9n" %% "gigahorse-asynchttpclient" % "0.7.0"
Square OkHttp 3.x を使う場合:
libraryDependencies += "com.eed3si9n" %% "gigahorse-okhttp" % "0.7.0"
Akka HTTP は現段階では実験的サポート:
libraryDependencies += "com.eed3si9n" %% "gigahorse-akka-http" % "0.7.0"
Apache v2
Gigahorse を使った簡単な GET 呼び出しの例をみてみよう:
scala> import gigahorse._, support.apachehttp.Gigahorse
scala> import scala.concurrent._, duration._
scala> val http = Gigahorse.http(Gigahorse.config)
scala> val r = Gigahorse.url("https://api.duckduckgo.com").get.
addQueryString(
"q" -> "1 + 1"
)
scala> val f = http.run(r, Gigahorse.asString andThen {_.take(60)})
scala> Await.result(f, 120.seconds)
scala> http.close()
Gigahorse
は、色々便利なものを作るためのヘルパーオブジェクトだ。
gigahorse.support.apachehttp.Gigahorse
。
gigahorse.support.asynchttpclient.Gigahorse
。
gigahorse.support.okhttp.Gigahorse
。
gigahorse.support.akkahttp.Gigahorse
。
HttpClient
は、複数のリクエストを処理することのできる HTTP クライアントを表す。
使用時には多くのスレッドを生成するので、HttpClient
のライフタイム管理に気を付ける必要がある。
これに無頓着だと、プログラムはリソース枯渇に陥る可能性がある。
HttpClient
を作るには 2つの方法がある。
第一の方法としては、Gigahorse.http(Gigahourse.config)
を使って
HttpClient
を作ることだ。Apache HTTP や AHC を使ってこの方法を取った場合、必ずクライアントを閉じる必要がある。
scala> import gigahorse._, support.apachehttp.Gigahorse
scala> val http = Gigahorse.http(Gigahorse.config)
scala> http.close() // must call close()
第二の方法は loan パターン Gigahorse.withHttp(config) { ... }
を使うことだ:
import gigahorse._, support.apachehttp.Gigahorse
Gigahorse.withHttp(Gigahorse.config) { http =>
// do something
}
これは、HttpClient
を閉じることを保証するけども、短所としては
全ての HTTP 処理が完了する前に閉じてしまう可能性があるので、
中で全ての Future
をブロックさせる必要がある。
HttpClient
を作るには Config
を渡す必要がある。
Gigahorse.config
は application.conf
があればそこから設定を読み込んで、
無ければデフォルト値を選んでくれる。
scala> Gigahorse.config
Request
は、単一の HTTP リクエストを表す不変 (immutable) なデータ型だ。
HttpClient
と違って、これは比較的に気軽に作ったり、取り回したりすることができる。
リクエストを構築するには、Gigahorse.url(...)
関数を呼ぶ:
scala> val r = Gigahorse.url("https://api.duckduckgo.com").get.
addQueryString(
"q" -> "1 + 1",
"format" -> "json"
)
上記のように、新しいリクエスト値を返す呼び出しを連鎖させることができる。
HttpClient
には多くのメソッドが定義されているが、おそらく最も便利なのは
http.run(r, f)
メソッドだ:
abstract class HttpClient extends AutoCloseable {
/** Runs the request and return a Future of A. Errors on non-OK response. */
def run[A](request: Request, f: FullResponse => A): Future[A]
....
}
第一パラメータは Request
を受け取り、第二パラメータは FullResponse
から A
への関数を受け取る。
Gigahorse.asString
という関数が予めついてきて、これはボディーのコンテンツを String
で返す。
さらに、これはただの関数なので andThen
を使って他の関数と合成することができる:
scala> import gigahorse._, support.apachehttp.Gigahorse
scala> import scala.concurrent._, duration._
scala> val http = Gigahorse.http(Gigahorse.config)
scala> val r = Gigahorse.url("https://api.duckduckgo.com").get.
addQueryString(
"q" -> "1 + 1"
)
scala> val f = http.run(r, Gigahorse.asString andThen {_.take(60)})
scala> Await.result(f, 120.seconds)
scala> http.close()
注意: OkHttp もしくは Akka HTTP を用いてレスポンスのボディーを消費しない場合は、
リソースを解放するために close()
メソッドを呼び出す必要がある。
run
はリクエストを非同期に実行するため、これは Future
を返す。
普通は、Future
をできる限り Future
のままで保っていると思うが、
ここでは値を見るために敢えてブロックしている。
Future
の中に値をできる限り保っておく動機として、並列に複数の
Future
(この場合 HTTP リクエスト) を取り扱うことができるということが挙げられる。
Future に関する詳細は Futures and Promises を参照してほしい。
Gigahorse は、全レスポンスに対して run
するのだけではなく、
返ってきた部分レスポンスを Reactive Stream として扱って、
例えば行ごとに処理するということも可能だ。
Request
値の構築を構築するには Gigahorse.url(...)
関数を呼び出す:
scala> import gigahorse._, support.okhttp.Gigahorse
scala> val url = "https://api.duckduckgo.com"
scala> val r = Gigahorse.url(url)
次に、Request
のメソッドをつなげていくことで新しい Request
値を作っていく。
HTTP verb (GET, POST, PATCH, PUT, DELETE, HEAD, OPTIONS) それぞれに対してメソッドがある。
scala> import java.io.File
scala> Gigahorse.url(url).get
scala> Gigahorse.url(url).post("")
scala> Gigahorse.url(url).post(new File("something.txt"))
post(...)
、put(...)
、patch(...)
メソッドは A: HttpWrite
という context bound のある
型パラメータ A
を受け取るオーバーロードがあるため、あらゆるカスタム型に対応できるように拡張することができる。
/** Uses GET method. */
def get: Request = this.withMethod(HttpVerbs.GET)
/** Uses POST method with the given body. */
def post[A: HttpWrite](body: A): Request = this.withMethod(HttpVerbs.POST).withBody(body)
/** Uses POST method with the given body. */
def post(body: String, charset: Charset): Request = this.withMethod(HttpVerbs.POST).withBody(EncodedString(body, charset))
/** Uses POST method with the given file. */
def post(file: File): Request = this.withMethod(HttpVerbs.POST).withBody(FileBody(file))
HTTP 認証を使う必要がある場合は、ユーザ名、パスワード、AuthScheme
を用いて Request
で指定することができる。
有効な AuthScheme
の値は Basic
、 Digest
、 NTLM
、 Kerberos
、 SPNEGO
となっている。
scala> Gigahorse.url(url).get.withAuth("username", "password", AuthScheme.Basic)
withAuth(...)
には Realm
値を受け取るオーバーロードもあって、それはより細かい設定をすることができる。
パラーメータはキーと値のタプルで設定できる。
scala> Gigahorse.url(url).get.
addQueryString(
"q" -> "1 + 1",
"format" -> "json"
)
テキストを POST する場合、Content-Type
ヘッダを指定するべきだ。
scala> import java.nio.charset.Charset
scala> Gigahorse.url(url).post("some text").
withContentType(MimeTypes.TEXT, Gigahorse.utf8)
ヘッダはキーと値のタプルで設定できる。
scala> Gigahorse.url(url).get.
addHeaders(
HeaderNames.AUTHORIZATION -> "bearer ****"
)
バーチャルホストは文字列で設定できる。
scala> Gigahorse.url(url).get.withVirtualHost("192.168.1.1")
Config
で指定された値を上書きしてリクエストタイムアウトを指定したい場合は、
withRequestTimeout
を使って設定できる。無期限を指定する場合は Duration.Inf
を渡す。
scala> import scala.concurrent._, duration._
scala> Gigahorse.url(url).get.withRequestTimeout(5000.millis)
フォームエンコードされたデータを POST で送信する Request
を作るには
post
に Map[String, List[String]]
を渡す。
scala> val r = Gigahorse.url("http://www.freeformatter.com/json-validator.html").
post(Map("inputString" -> List("{}")))
post
、put
、patch
メソッドを使ってファイル送信のための Request
値を作ることができる。
scala> Gigahorse.url(url).post(new File("something.txt"))
Request
値が構築できたら、次に HttpClient
に渡して、
run
、download
、processFull
, runStream
といったメソッドを使って実行することができる。
HttpClient
には多くのメソッドが定義されているが、おそらく最も便利なのは
http.run(r, f)
メソッドだ。基本的な概念のページで見たようにこれは、
Request
値と FullResponse => A
の関数を受け取る。
Gigahorse は、Future[String]
を返すために Gigahorse.asString
という関数を提供するが、
これを拡張して他の型に応用できることは想像に難くない。
一つ注意するべきなのは、run
メソッドは HTTP 2XX 番台のステータスのみを受け付け、
その他の場合は Future
を失敗させるということだ。(デフォルトの設定では、3XX のリダイレクトは自動的に処理される)
関数を渡すのに加え、中の値を map することで簡単に Future
を後付けで処理することができる。
import gigahorse._, support.okhttp.Gigahorse
import scala.concurrent._, duration._
import ExecutionContext.Implicits._
val http = Gigahorse.http(Gigahorse.config)
val r = Gigahorse.url("https://api.duckduckgo.com").get.
addQueryString(
"q" -> "1 + 1"
)
val f0: Future[FullResponse] = http.run(r, identity)
val f: Future[String] = f0 map { Gigahorse.asString andThen (_.take(60)) }
Await.result(f, 120.seconds)
Future
に対して何らかの演算を行うときは、implicit な実行コンテキストが必要となる。
実行コンテキストは、Future
のコールバックがどのスレッドプールで実行されるかを宣言するものだ。
便宜上、Request
のみを受け取る run
のオーバーロードもある。
Future
が失敗する場合があるときによく行われる処理として、内部の A
を
Either[Throwable, A]
に持ち上げるということが行われる。
http://getstatuscode.com/ という便利なウェブサイトがあって、これは
任意の HTTP ステータスを返すことができる。失敗した Future
に対してブロックするとどうなるかをみてみよう。
val r = Gigahorse.url("http://getstatuscode.com/500")
val f = http.run(r, Gigahorse.asString)
Await.result(f, 120.seconds)
Gigahorse.asEither
という機構を使って A
を Either[Throwable, A]
に持ち上げることができる。
val r = Gigahorse.url("http://getstatuscode.com/500")
val f = http.run(r, Gigahorse.asEither)
Await.result(f, 120.seconds)
asEither
は右バイアスのかかった Either
として map
することもできる。
val r = Gigahorse.url("http://getstatuscode.com/200")
val f = http.run(r, Gigahorse.asEither map {
Gigahorse.asString andThen (_.take(60)) })
Await.result(f, 120.seconds)
non-2XX レスポンスでエラーを投げたくなくて、例えば 500 レスポンスのボディーテキストを
読み込みたい場合は processFull
メソッドを使う。
val r = Gigahorse.url("http://getstatuscode.com/500")
val f = http.processFull(r, Gigahorse.asString andThen (_.take(60)))
Await.result(f, 120.seconds)
ここまでは、ボディーコンテンツの全てをメモリ上に受け取った
FullResponse
の処理をみてきた。
コンテンツが比較的小さい場合はそれでもいいかもしれないが、
例えばファイルをダウンロードする場合などはコンテンツの
チャンクを受け取り次第に処理していきたい。
http.download
メソッドを使ってファイルをダウンロードすることができる。
scala> import gigahorse._, support.okhttp.Gigahorse
scala> import scala.concurrent._, duration._
scala> import ExecutionContext.Implicits._
scala> import java.io.File
scala> val http = Gigahorse.http(Gigahorse.config)
scala> {
val file = new File(new File("target"), "Google_2015_logo.svg")
val r = Gigahorse.url("https://upload.wikimedia.org/wikipedia/commons/2/2f/Google_2015_logo.svg")
val f = http.download(r, file)
Await.result(f, 120.seconds)
}
これは Future[File]
を返す。
http.runStream(r, f)
を使うと返ってきたレスポンスを
Reactive Stream として取り扱って、パーツごとに処理することができる。
/** Runs the request and return a Future of A. */
def runStream[A](request: Request, f: StreamResponse => Future[A]): Future[A]
ここで注目してほしいのは、関数が FullResponse
ではなくて StreamResponse
を受け取ることだ。FullResponse
と違って、StreamResponse
はボディーコンテンツをまだ受け取っていない。
その代わりに StreamResponse
は、コンテンツのパーツをオンデマンドで受け取る
Stream[A]
を作ることができる。
出発点として、Gigahorse は Gigahorse.asByteStream
と
Gigahorse.asStringStream
を提供する。
Stream[A]
はこのような実装になっている:
import org.reactivestreams.Publisher
import scala.concurrent.Future
abstract class Stream[A] {
/**
* @return The underlying Stream object.
*/
def underlying[B]: B
def toPublisher: Publisher[A]
/** Runs f on each element received to the stream. */
def foreach(f: A => Unit): Future[Unit]
/** Runs f on each element received to the stream with its previous output. */
def fold[B](zero: B)(f: (B, A) => B): Future[B]
/** Similar to fold but uses first element as zero element. */
def reduce(f: (A, A) => A): Future[A]
}
これを使えば比較的簡単にストリーム処理を行うことができる。
例えば、download
は以下のように実装されている。
def download(request: Request, file: File): Future[File] =
runStream(request, asFile(file))
....
import java.nio.ByteBuffer
import java.nio.charset.Charset
import java.io.{ File, FileOutputStream }
import scala.concurrent.Future
object DownloadHandler {
/** Function from `StreamResponse` to `Future[File]` */
def asFile(file: File): StreamResponse => Future[File] = (response: StreamResponse) =>
{
val stream = response.byteBuffers
val out = new FileOutputStream(file).getChannel
stream.fold(file)((acc, bb) => {
out.write(bb)
acc
})
}
}
これはパーツが届くと FileOutputStream
に書き込んでいる。
Akka HTTP を使った例もみてみる。
$ python -m SimpleHTTPServer 8000
を実行してカレントディレクトリを
8000番ポートでサーブしているとして、
README.markdown
の各行を表示したい。
scala> import gigahorse._, support.akkahttp.Gigahorse
import gigahorse._
import support.akkahttp.Gigahorse
scala> import scala.concurrent._, duration._
import scala.concurrent._
import duration._
scala> Gigahorse.withHttp(Gigahorse.config) { http =>
val r = Gigahorse.url("http://localhost:8000/README.markdown").get
val f = http.runStream(r, Gigahorse.asStringStream andThen { xs =>
xs.foreach { s => println(s) }
})
Await.result(f, 120.seconds)
}
Gigahorse
==========
Gigahorse is an HTTP client for Scala with Async Http Client or Lightbend Akka HTTP underneath.
....
うまくいった。これは JSON が入った無限ストリームを処理するのに使える。
Gigahorse.config
は application.conf
があればそこから設定を読み込む。
gigahorse.followRedirects
: 301、および、302 でのリダイレクトにクライアントが従うかを設定する。 (デフォルトは true).
gigahorse.useProxyProperties
: JVM システムの HTTP プロキシ設定 (http.proxyHost, http.proxyPort) を使用するか設定する。 (デフォルトは true).
gigahorse.userAgent
: User-Agent ヘッダーフィールドを設定する。
gigahorse.compressionEnforced
: このプロパティが true の場合 gzip/deflater によるエンコーディングを行う。 (デフォルトは false).
HTTP over SSL/TLS (HTTPS) に関する Gigahorse の設定については、 Play WS の [WS SSLの設定]を参照してほしい。
設定値は gigahorse.ssl
内で行う:
gigahorse.ssl {
trustManager = {
stores = [
{ type = "JKS", path = "exampletrust.jks" }
]
}
}
Gigahorse には 3つの異なるタイムアウトがある。タイムアウトになると、リクエストは中断される。
gigahorse.connectTimeout
: リモートホストとの接続を行う最大の時間 (デフォルトは 120 秒).
gigahorse.requestTimeout
: リクエストにかかる全体の時間 (リモートホストがデータを送信中であっても、中断する可能性がある) (デフォルトは 120 秒).
gigahorse.readTimeout
: アイドル状態 (コネクションは確立したが、データを待っている状態) を保持する最大の時間 (デフォルトは 120 秒).
個々のリクエストのタイムアウトは withRequestTimeout()
を使用することで上書き可能だ。 (Request 値の構築を参照。)
他に以下のような設定がある。
詳しくは AsyncHttpClientConfig のドキュメントを参照してほしい。
gigahorse.maxRedirects
: リダイレクトの最大数 (デフォルトは 5).
gigahorse.maxRequestRetry
: 失敗時の再試行の最大数 (デフォルトは 5).
gigahorse.disableUrlEncoding
: URL エンコーディングせずに生の URL を使うべきかどうか (デフォルトはfalse).
gigahorse.keepAlive
: コネクションプーリングを行うかどうか (デフォルトは true).
gigahorse.pooledConnectionIdleTimeout
: プール内でアイドル状態が続いた時に接続が閉じられる時間。
gigahorse.connectionTtl
: プール内で接続が生き続ける最大の時間。
gigahorse.maxConnections
: 最大接続数。無限の場合は -1。
gigahorse.maxConnectionsPerHost
: ホストあたりの最大接続数。無限の場合は -1。
Gigahorse に何らかのファイルフォーマット、もしくは RESTful API へのサポートを提供するなど色々な方法で拡張することができる。
ここでも Gigahorse の基本パターンである http.run(r, f)
を応用することができる。
Gigahorse プラグインは以下のものを提供する:
Request
データ型を構築するためのヘルパー。
Response
から他の何か役に立つものに変換するための関数群。
ここではプラグインの書き方の一つの方法を紹介する。
まずは Request
を返すメソッドを 1つ持つ RequestBuilder
を定義する:
scala> import gigahorse._, support.okhttp.Gigahorse
scala> import scala.concurrent._, duration._
scala> :paste
abstract class RequestBuilder {
protected val baseUrl = "https://api.github.com"
def build: Request
}
GET /repos/:owner/:repo をラッピングするには、以下のような case class を定義してリクエストを表す:
scala> :paste
case class Repos(owner: String, name: String) extends RequestBuilder {
def build: Request = Gigahorse.url(s"$baseUrl/repos/$owner/$name")
}
通常 RESTful API は何らかの認証方法を提供する。以下のラッパーは各リクエストに OAuth 処理を加えるためのものだ:
scala> import collection.immutable.Map
scala> :paste
/** AbstractClient is a function to wrap API operations */
abstract class AbstractClient {
def httpHeaders: Map[String, String] =
Map(HeaderNames.ACCEPT -> MimeTypes.JSON)
def complete(request: Request): Request =
if (httpHeaders.isEmpty) request
else request.addHeaders(httpHeaders.toList: _*)
def apply(builder: RequestBuilder): Request =
complete(builder.build)
}
case class NoAuthClient() extends AbstractClient {
}
case class OAuthClient(token: String) extends AbstractClient {
override def httpHeaders: Map[String, String] =
super.httpHeaders ++ Map("Authorization" -> "bearer %s".format(token))
override def toString: String =
s"OAuthClient(****)"
}
Gigahorse
object 同様に、Github
object を提供して必要な関数を一ヶ所にまとめる。
scala> :paste
object Github {
def noAuthClient = NoAuthClient()
def oauthClient(token: String) =
OAuthClient(token)
def repo(owner: String, name: String): Repos =
Repos(owner, name)
}
これは以下のように呼び出す:
scala> val client = Github.noAuthClient
client: NoAuthClient = NoAuthClient()
scala> val http = Gigahorse.http(Gigahorse.config)
http: gigahorse.HttpClient = gigahorse.support.okhttp.OkhClient@182ab1c0
scala> {
val f = http.run(client(Github.repo("eed3si9n", "gigahorse")), Gigahorse.asString andThen (_.take(60)) )
Await.result(f, 120.seconds)
}
res0: String = {"id":64110679,"name":"gigahorse","full_name":"eed3si9n/giga
次に、返された JSON 値のためのパーサーを提供したい。 手書きで case class と JSON コーデックを定義してもいいけども、contraband の JSON データバインディング機能を使ってみよう。 これは、以下のようなスキーマからデータ型とコーデックの両方を自動生成する:
{
"codecNamespace": "example.github.response",
"fullCodec": "CustomJsonProtocol",
"types": [
{
"name": "Repo",
"namespace": "example.github.response",
"type": "record",
"target": "Scala",
"fields": [
{
"name": "url",
"type": "String",
"since": "0.0.0"
},
{
"name": "name",
"type": "String",
"since": "0.0.0"
},
{
"name": "id",
"type": "long",
"since": "0.0.0"
}
.....
],
"extra": [
]
}
]
}
これは Repo
という名前の擬似 case class、RepoFormats
という名前の JSON コーデック、
そして CustomJsonProtocol
という名前の全てのフォーマットを統合した統合コーデックを生成する。
/**
* This code is generated using [[http://www.scala-sbt.org/contraband/ sbt-contraband]].
*/
// DO NOT EDIT MANUALLY
package gigahorse.github.response
final class Repo(
val url: String,
val name: String,
....) extends Serializable {
....
}
object Repo {
def apply(url: String, name: String, id: Long ....
}
適当な JSON パーサーと合成することで asRepo
関数を定義できる。
import gigahorse._, support.asynchttpclient.Gigahorse
import github.{ response => res }
import sjsonnew.JsonFormat
import sjsonnew.support.scalajson.unsafe.Converter
import scala.json.ast.unsafe.JValue
import java.nio.ByteBuffer
object Github {
import res.CustomJsonProtocol._
def noAuthClient = NoAuthClient()
def oauthClient(token: String) =
OAuthClient(token)
def repo(owner: String, name: String): Repos =
Repos(owner, name)
val asJson: Response => JValue =
(r: Response) => {
import sjsonnew.support.scalajson.unsafe.Parser
val buffer = ByteBuffer.wrap(r.bodyAsBytes)
Parser.parseFromByteBuffer(buffer).get
}
def as[A: JsonFormat]: Response => A =
asJson andThen Converter.fromJsonUnsafe[A]
val asRepo: Response => res.Repo = as[res.Repo]
}
これは以下のようにして使うことができる:
scala> Gigahorse.withHttp(Gigahorse.config) { http =>
val f = http.run(client(Github.repo("eed3si9n", "gigahorse")), Github.asRepo)
Await.result(f, 2.minutes)
}
res0: Repo = Repo(https://api.github.com/repos/eed3si9n/gigahorse, gigahorse, 64110679,...
詳細に関しては、gigahorse-github のソース参照。