Finagle – это RPC система от компании Twitter. Здесь рассказывается о мотивах и основных принципа дизайна, finagle README содержит больше детальной информации. Finagle помогает просто создавать надежные клиентские и серверные приложения.
Finagle использует com.twitter.util.Future
1, чтобы описать отложенные операции. Futures очень выразительны и компактны, они позволяют кратко описать параллельные и последовательные операции с большой ясностью. Futures управляют значениями, которые еще не доступны, с методами для регистрации обратного вызова, который вызывается, когда значение становится доступным. Они переворачивают с ног на голову “традиционную” модель асинхронных вычислений, которые обычно предоставляют API, похожее на это:
Callback<R> cb = new Callback<R>() { void onComplete(R result) { … } void onFailure(Throwable error) { … } } dispatch(req, cb);
Здесь Callback.onComplete
вызывается, когда результат операции dispatch
становится доступен, и Callback.onFailure
если операция провалилась. Во futures, мы переворачиваем поток управления:
val future = dispatch(req) future onSuccess { value => … } future onFailure { error => … }
Futures сами по себе это комбинаторы, с которыми мы столкнулись в различных API коллекциях. Комбинаторы работают, используя единый API, оборачивая некоторые Future
новым поведением без изменения Future
.
Наиболее важным Future
комбинатором является flatMap
2:
def Future[A].flatMap[B](f: A => Future[B]): Future[B]
flatMap
объединяет две сущности. Сигнатура метода описывает: данное успешное значение future f
должно предоставить следущий Future
. Результат этой операции другой Future
, который завершится когда оба этих futures будут завершены. Если один из Future
завершится с ошибкой, то данный Future
также завершится с ошибкой. Это неявное чередование ошибок позволяет нам управлять ошибками только в тех местах, где это необходимо. flatMap
это стандартное имя для данного комбинатора с заданной семантикой. В Scala есть короткая запись для этого вызова: конструкция for
.
Как пример, давайте предположим, что у нас есть методы authenticate: Request -> User
, и rateLimit: User -> Boolean
, тогда получим следующий код:
val f = authenticate(request) flatMap { u => rateLimit(u) map { r => (u, r) }
С помощью конструкции for, мы можем написать:
val f = for { u <- authenticate(request) r <- rateLimit(u) } yield (u, r)
используем future f: Future[(User, Boolean)]
, который предоставляет объект пользователь и логическое выражение, которое сигнализирует о достигнутости пользователем предела. Заметьте, как здесь требуемая последовательная композиция: rateLimit
берет аргумент выходного результата authenticate
Есть также несколько параллельных комбинаторов. Обычно они конвертируют последовательность Future
-ов в Future
последовательность, по-разному:
object Future { … def collect[A](fs: Seq[Future[A]]): Future[Seq[A]] def join(fs: Seq[Future[_]]): Future[Unit] def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])] }
collect
самый простой вариант: берем набор Future
похожего типа, мы получаем Future
последовательность значений этого типа. Этот future завершится, когда все остальные futures будут завершены, или когда один из них завершится с ошибкой.
join
берет последовательнсоть Future
типы которых можно смешать, возвращая Future[Unit]
, который завершится, когда все остальные futures будут завершены (или завершится с ошибкой, если в одном из них ошибка). Это полезно для отслеживания завершения набора гетерогенных операций.
select
возвращает Future
, который завершится, когда первый из данных Future
завершится, вместе с остальными незавершенными future.
В совокупности, это дает нам мощное и краткое выражение основных сетевых операций. Этот гипотетический код выполняет установку ограничений (с целью сохранения локального кеша) параллелльно с управлением запросами пользователя на серверной стороне:
def serve(request: Request): Future[Response] = { val userLimit: Future[(User, Boolean)] = for { user <- auth(request) limited <- isLimit(user) } yield (user, limited) val done = dispatch(request) join userLimit done flatMap { case (rep, (usr, lim)) => if (lim) { updateLocalRateLimitCache(usr) Future.exception(new Exception("rate limited")) } else { Future.value(rep) } } }
Этот гипотетический пример объединяет последовательную и параллельную композиции. Также обратите внимание, что нет явного обработчика ошибок, только конвертирование ограничения отвечает за выброс исключения. Если future выдает здесь ошибку, она автоматически распространяется до возвращаемого Future
.
Service
это фукнция Req => Future[Rep]
для запросов и типовых ответов. Service
используется как на клиенте, так и на сервере: серверы реализуют Service
, а клиенты используют сборщики, для создания запросов.
abstract class Service[-Req, +Rep] extends (Req => Future[Rep])
Простой HTTP клиент может делать:
service: Service[HttpRequest, HttpResponse] val f = service(HttpRequest("/", HTTP_1_1)) f onSuccess { res => println("got response", res) } onFailure { exc => println("failed :-(", exc) }
Серверы реализуют Service
:
class MyServer extends Service[HttpRequest, HttpResponse] { def apply(request: HttpRequest) = { request.path match { case "/" => Future.value(HttpResponse("root")) case _ => Future.value(HttpResponse("default")) } } }
Комбинировать их просто. Простейший прокси может выглядеть так:
class MyServer(client: Service[..]) extends Service[HttpRequest, HttpResponse] { def apply(request: HttpRequest) = { client(rewriteReq(request)) map { res => rewriteRes(res) } } }
где rewriteReq
и rewriteRes
могут предоставить протокол перевода, например.
Фильтры – это service преобразователи. Они полезны как для предоставления функциональности обобщенный service, так и для производства данного service в различных состояниях.
abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn] extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])
Его тип лучше рассмотреть схематически:
((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut]) (* Service *) [ReqIn -> (ReqOut -> RepIn) -> RepOut]
Здесь показан способ как вы можете написать фильтр, который является механизмом задержки service.
class TimeoutFilter[Req, Rep]( timeout: Duration, timer: util.Timer) extends Filter[Req, Rep, Req, Rep] { def apply( request: Req, service: Service[Req, Rep] ): Future[Rep] = { service(request).timeout(timer, timeout) { Throw(new TimedoutRequestException) } } }
Этот пример показывает как вы можете использовать аутентификацию (через сервис аутентификации) для того, чтобы конвертировать Service[AuthHttpReq, HttpRep]
в Service[HttpReq, HttpRep]
.
class RequireAuthentication(authService: AuthService) extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] { def apply( req: HttpReq, service: Service[AuthHttpReq, HttpRep] ) = { authService.auth(req) flatMap { case AuthResult(AuthResultCode.OK, Some(passport), _) => service(AuthHttpReq(req, passport)) case ar: AuthResult => Future.exception( new RequestUnauthenticated(ar.resultCode)) } } }
Фильтры объединяются вместе с andThen
. Предоставляя Service
как аргумент для andThen
, создающий (отфильтрованный) Service
(типы представлены для иллюстрации).
val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep] val serviceRequiringAuth: Service[AuthHttpReq, HttpRep] val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] = authFilter andThen timeoutFilter val authenticatedTimedOutService: Service[HttpReq, HttpRep] = authenticateAndTimedOut andThen serviceRequiringAuth
В конце концов, компоновщики собирают все вместе. ClientBuilder
предоставляет экземпляр Service
, дающий набор параметров, и ServerBuilder
берущий экземпляр Service
и отправляет ему входящие запросы. Для того чтобы определить тип Service
, мы должны иметь Codec
. Codec предоставляет нижележащий протокол реализации (например, HTTP, thrift, memcached). Оба компоновщика имеют много параметров, но их требуется совсем немного.
Ниже представлен пример вызова ClientBuilder
(типы представлены для иллюстрации):
val client: Service[HttpRequest, HttpResponse] = ClientBuilder() .codec(Http) .hosts("host1.twitter.com:10000,host2.twitter.com:10001,host3.twitter.com:10003") .hostConnectionLimit(1) .tcpConnectTimeout(1.second) .retries(2) .reportTo(new OstrichStatsReceiver) .build()
Здесь создается клиент, который выравнивает нагрузку 3 данных хостов, устанавливая более чем 1 соединение на хост, и падает только после 2 неудач. Статистика собирается с помощью ostrich. Требуются следующие опции компоновщика (и они обычно присутствуют постоянно): hosts
или cluster
, codec
и hostConnectionLimit
.
val myService: Service[HttpRequest, HttpResponse] = // provided by the user ServerBuilder() .codec(Http) .hostConnectionMaxLifeTime(5.minutes) .readTimeout(2.minutes) .name("myHttpServer") .bindTo(new InetSocketAddress(serverPort)) .build(myService)
Все будет работать на порту serverPort
HTTP сервера, который управляет запросами на myService
. Каждое соединение держится открытым до 5 минут, и мы также требуем, чтобы запрос был отправлен в течение 2 минут. Необходимые опции ServerBuilder
: name
, bindTo
and codec
.
1 отличающийся от java.util.concurrent.Future
2 это равносильно монадическому(monadic) связыванию