Finagle – это RPC система от компании Twitter. Здесь рассказывается о мотивах и основных принципа дизайна, finagle README содержит больше детальной информации. Finagle помогает просто создавать надежные клиентские и серверные приложения.

Futures

Finagle использует com.twitter.util.Future1, чтобы описать отложенные операции. Futures очень выразительны и компактны, они позволяют кратко описать параллельные и последовательные операции с большой ясностью. Futures управляют значениями, которые еще не доступны, с методами для регистрации обратного вызова, который вызывается, когда значение становится доступным. Они переворачивают с ног на голову “традиционную” модель асинхронных вычислений, которые обычно предоставляют API, похожее на это:

Callback<R> cb = new Callback<R>() {
  void onComplete(R result) { … }
  void onFailure(Throwable error) { … }
}

dispatch(req, cb);

Здесь Callback.onComplete вызывается, когда результат операции dispatch становится доступен, и Callback.onFailure если операция провалилась. Во futures, мы переворачиваем поток управления:

val future = dispatch(req)
future onSuccess { value => … }
future onFailure { error => … }

Futures сами по себе это комбинаторы, с которыми мы столкнулись в различных API коллекциях. Комбинаторы работают, используя единый API, оборачивая некоторые Future новым поведением без изменения Future.

Последовательная композиция

Наиболее важным Future комбинатором является flatMap2:

def Future[A].flatMap[B](f: A => Future[B]): Future[B]

flatMap объединяет две сущности. Сигнатура метода описывает: данное успешное значение future f должно предоставить следущий Future. Результат этой операции другой Future, который завершится когда оба этих futures будут завершены. Если один из Future завершится с ошибкой, то данный Future также завершится с ошибкой. Это неявное чередование ошибок позволяет нам управлять ошибками только в тех местах, где это необходимо. flatMap это стандартное имя для данного комбинатора с заданной семантикой. В Scala есть короткая запись для этого вызова: конструкция for.

Как пример, давайте предположим, что у нас есть методы authenticate: Request -> User, и rateLimit: User -> Boolean, тогда получим следующий код:

val f = authenticate(request) flatMap { u =>
  rateLimit(u) map { r => (u, r)
}

С помощью конструкции for, мы можем написать:

val f = for {
  u <- authenticate(request)
  r <- rateLimit(u)
} yield (u, r)

используем future f: Future[(User, Boolean)], который предоставляет объект пользователь и логическое выражение, которое сигнализирует о достигнутости пользователем предела. Заметьте, как здесь требуемая последовательная композиция: rateLimit берет аргумент выходного результата authenticate

Параллельная композиция

Есть также несколько параллельных комбинаторов. Обычно они конвертируют последовательность Future-ов в Future последовательность, по-разному:

object Future {
  …
  def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]
  def join(fs: Seq[Future[_]]): Future[Unit]
  def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])]
}

collect самый простой вариант: берем набор Future похожего типа, мы получаем Future последовательность значений этого типа. Этот future завершится, когда все остальные futures будут завершены, или когда один из них завершится с ошибкой.

join берет последовательнсоть Future типы которых можно смешать, возвращая Future[Unit], который завершится, когда все остальные futures будут завершены (или завершится с ошибкой, если в одном из них ошибка). Это полезно для отслеживания завершения набора гетерогенных операций.

select возвращает Future, который завершится, когда первый из данных Futureзавершится, вместе с остальными незавершенными future.

В совокупности, это дает нам мощное и краткое выражение основных сетевых операций. Этот гипотетический код выполняет установку ограничений (с целью сохранения локального кеша) параллелльно с управлением запросами пользователя на серверной стороне:

def serve(request: Request): Future[Response] = {
  val userLimit: Future[(User, Boolean)] =
    for {
      user    <- auth(request)
      limited <- isLimit(user)
    } yield (user, limited)
  
  val done = 
    dispatch(request) join userLimit
  
  done flatMap { case (rep, (usr, lim)) =>
    if (lim) {
      updateLocalRateLimitCache(usr)
      Future.exception(new Exception("rate limited"))
    } else {
      Future.value(rep)
    }
  }
}

Этот гипотетический пример объединяет последовательную и параллельную композиции. Также обратите внимание, что нет явного обработчика ошибок, только конвертирование ограничения отвечает за выброс исключения. Если future выдает здесь ошибку, она автоматически распространяется до возвращаемого Future.

Service

Service это фукнция Req => Future[Rep] для запросов и типовых ответов. Service используется как на клиенте, так и на сервере: серверы реализуют Service, а клиенты используют сборщики, для создания запросов.

abstract class Service[-Req, +Rep] extends (Req => Future[Rep])

Простой HTTP клиент может делать:

service: Service[HttpRequest, HttpResponse]

val f = service(HttpRequest("/", HTTP_1_1))
f onSuccess { res =>
  println("got response", res)
} onFailure { exc =>
  println("failed :-(", exc)
}

Серверы реализуют Service:

class MyServer 
  extends Service[HttpRequest, HttpResponse]
{
  def apply(request: HttpRequest) = {
    request.path match {
      case "/" => 
        Future.value(HttpResponse("root"))
      case _ => 
        Future.value(HttpResponse("default"))
    }
  }
}

Комбинировать их просто. Простейший прокси может выглядеть так:

class MyServer(client: Service[..])
  extends Service[HttpRequest, HttpResponse]
{
  def apply(request: HttpRequest) = {
    client(rewriteReq(request)) map { res =>
      rewriteRes(res)
    }
  }
}

где rewriteReq и rewriteRes могут предоставить протокол перевода, например.

Фильтры

Фильтры – это service преобразователи. Они полезны как для предоставления функциональности обобщенный service, так и для производства данного service в различных состояниях.

abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
  extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])

Его тип лучше рассмотреть схематически:

    ((ReqIn, Service[ReqOut, RepIn]) 
         => Future[RepOut])


           (*   Service   *)
[ReqIn -> (ReqOut -> RepIn) -> RepOut]

Здесь показан способ как вы можете написать фильтр, который является механизмом задержки service.

class TimeoutFilter[Req, Rep](
    timeout: Duration, timer: util.Timer)
  extends Filter[Req, Rep, Req, Rep]
{
  def apply(
    request: Req, service: Service[Req, Rep]
  ): Future[Rep] = {
    service(request).timeout(timer, timeout) {
      Throw(new TimedoutRequestException)
    }
  }
}

Этот пример показывает как вы можете использовать аутентификацию (через сервис аутентификации) для того, чтобы конвертировать Service[AuthHttpReq, HttpRep] в Service[HttpReq, HttpRep].

class RequireAuthentication(authService: AuthService)
  extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
{
  def apply(
    req: HttpReq, 
    service: Service[AuthHttpReq, HttpRep]
  ) = {
    authService.auth(req) flatMap {
      case AuthResult(AuthResultCode.OK, Some(passport), _) =>
        service(AuthHttpReq(req, passport))
      case ar: AuthResult =>
        Future.exception(
          new RequestUnauthenticated(ar.resultCode))
    }
  }
}

Фильтры объединяются вместе с andThen. Предоставляя Service как аргумент для andThen, создающий (отфильтрованный) Service (типы представлены для иллюстрации).

val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep]
val serviceRequiringAuth: Service[AuthHttpReq, HttpRep]

val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] =
  authFilter andThen timeoutFilter

val authenticatedTimedOutService: Service[HttpReq, HttpRep] =
  authenticateAndTimedOut andThen serviceRequiringAuth

Компоновщики

В конце концов, компоновщики собирают все вместе. ClientBuilder предоставляет экземпляр Service, дающий набор параметров, и ServerBuilder берущий экземпляр Service и отправляет ему входящие запросы. Для того чтобы определить тип Service, мы должны иметь Codec. Codec предоставляет нижележащий протокол реализации (например, HTTP, thrift, memcached). Оба компоновщика имеют много параметров, но их требуется совсем немного.

Ниже представлен пример вызова ClientBuilder (типы представлены для иллюстрации):

val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
  .codec(Http)
  .hosts("host1.twitter.com:10000,host2.twitter.com:10001,host3.twitter.com:10003")
  .hostConnectionLimit(1)
  .tcpConnectTimeout(1.second)
  .retries(2)
  .reportTo(new OstrichStatsReceiver)
  .build()

Здесь создается клиент, который выравнивает нагрузку 3 данных хостов, устанавливая более чем 1 соединение на хост, и падает только после 2 неудач. Статистика собирается с помощью ostrich. Требуются следующие опции компоновщика (и они обычно присутствуют постоянно): hosts или cluster, codec и hostConnectionLimit.

val myService: Service[HttpRequest, HttpResponse] = // provided by the user
ServerBuilder()
  .codec(Http)
  .hostConnectionMaxLifeTime(5.minutes)
  .readTimeout(2.minutes)
  .name("myHttpServer")
  .bindTo(new InetSocketAddress(serverPort))
  .build(myService)

Все будет работать на порту serverPort HTTP сервера, который управляет запросами на myService. Каждое соединение держится открытым до 5 минут, и мы также требуем, чтобы запрос был отправлен в течение 2 минут. Необходимые опции ServerBuilder: name, bindTo and codec.

1 отличающийся от java.util.concurrent.Future

2 это равносильно монадическому(monadic) связыванию