Мы начинаем создавать простой распределенный поисковый движок, используя Scala, и обсуждаемый ранее Finagle фреймфорк.

Основные цели: Обзор

В широком смысле, наши цели должны включать абстракцию (возможность использовать полученную систему, без всех внутренних деталей); модульность (возможность системы делиться на маленькие, простые части, которые просты для понимания и/или могут быть легко заменены); и масштабирование (возможность возможность увеличить производительность системы простыми способами).

Система, которую мы собираемся описать состоит из трех частей: (1) клиенты, которые делают запросы к (2) серверам, которые посылают ответ на запрос, и (3) транспорт – механизм, который упаковывает эти сообщения. Обычно клиент и сервер будут находиться на разных машинах и общаться по сети через конкретный порт, но в данном примере, они будут работать на одной и той же машине (и тоже общаться с помощью портов). В нашем примере, клиенты и серверы будут написаны на Scala, и транспорт будет обрабатываться с помощью Thrift. Основной целью этого урока является показать работу простого сервера и клиента, которые могут быть расширены, чтобы обеспечить необходимый уровень производительность при масштабировании.

Обзор стандартного проекта-заготовки

Во-первых, создадим скелет проекта (“Searchbird”) с использованием scala-bootstrapper. С его помощью создается простой Finagle на основе Scala сервиса, который экспортирует из памяти хранилище ключ-значение. Сначала мы расширим его, чтобы он мог искать значения, а затем расширим его, чтобы он мог поддерживать поиск во многих, находящихся в памяти, базах, используя несколько процессов.

$ mkdir searchbird ; cd searchbird
$ scala-bootstrapper searchbird
writing build.sbt
writing config/development.scala
writing config/production.scala
writing config/staging.scala
writing config/test.scala
writing console
writing Gemfile
writing project/plugins.sbt
writing README.md
writing sbt
writing src/main/scala/com/twitter/searchbird/SearchbirdConsoleClient.scala
writing src/main/scala/com/twitter/searchbird/SearchbirdServiceImpl.scala
writing src/main/scala/com/twitter/searchbird/config/SearchbirdServiceConfig.scala
writing src/main/scala/com/twitter/searchbird/Main.scala
writing src/main/thrift/searchbird.thrift
writing src/scripts/searchbird.sh
writing src/scripts/config.sh
writing src/scripts/devel.sh
writing src/scripts/server.sh
writing src/scripts/service.sh
writing src/test/scala/com/twitter/searchbird/AbstractSpec.scala
writing src/test/scala/com/twitter/searchbird/SearchbirdServiceSpec.scala
writing TUTORIAL.md

Давайте сначала изучим стандартный проект scala-bootstrapper, созданный для нас. Он является шаблоном. Вы в конечном счете замените большую часть проекта, но он послужит удобным средством для старта. Проект определяет простое (но полное) хранилище ключ-значение. Конфигурация, thrift интерфейс, статистика экспорта и ведение логов все это включено изначально.

Прежде чем мы вглянем на код, давайте запустим клиент и сервер, чтобы увидеть, как все работает. Вот то, что мы делаем:

Searchbird implementation, revision 1

а вот интерфейс, который экспортирует наш сервис. Поскольку сервис Searchbird это Thrift сервис (как и большинство наших сервисов), его внешний интерфейс определен в Thrift IDL (“язык описания интерфейса”).

src/main/thrift/searchbird.thrift
service SearchbirdService {
  string get(1: string key) throws(1: SearchbirdException ex)

  void put(1: string key, 2: string value)
}

Здесь все просто: наш сервис SearchbirdService экспортирует 2 RPC метода, get и put. Они включают в себя простой интерфейс для хранилища ключ-значение.

Теперь давайте запустим стандартный сервис и клиент, который подключается к этому сервису, и исследуем их через этот интерфейс. Откройте два окна, одно для сервера и одно для клиента.

В первом окне, запустите SBT в интерактивном режиме (запустите ./sbt из командной строки 1), а затем собирите и запустите проект из SBT. После этого запустится программа main в Main.scala.

$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala

Файл конфигурации (development.scala) создает новый сервис, привязывая сервис к порту 9999 на нашей локальной машине. Клиенты могут общаться с этим сервисом при подключении к порту 9999.

Теперь мы создадим и запустим клиент, используя предоставленный ​​@console@ шелл скрипт, который создает экземпляр SearchbirdConsoleClient (из SearchbirdConsoleClient.scala). Запустите этот сценарий в другом окне:

$ ./console 127.0.0.1 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.

finagle-client>

Клиентский объект client теперь подключен к порту 9999 на нашем локальном компьютере и может общаться с сервисом, который мы ранее запустили на этом порту. Теперь давайте отправим несколько запросов:

scala> client.put("marius", "Marius Eriksen")
res0: ...

scala> client.put("stevej", "Steve Jenson")
res1: ...

scala> client.get("marius")
res2: com.twitter.util.Future[String] = ...

scala> Await.result(client.get("marius"))
res3: String = Marius Eriksen

(Второй вызов Await.result() создает Future, который является возвращаемым типом client.get(), блокируя процесс до того, пока не будет получено значение.)

Сервер также экспортирует статистику времени выполнения (файл конфигурации определяет для этого порт 9900). Это удобно как для проверки отдельных серверов, а также для объединения в глобальную статистику сервиса (предоставляется также машиночитаемый JSON интерфейс). Откройте третье окно и проверьте эту статистику:

$ curl localhost:9900/stats.txt
counters:
  Searchbird/connects: 1
  Searchbird/received_bytes: 264
  Searchbird/requests: 3
  Searchbird/sent_bytes: 128
  Searchbird/success: 3
  jvm_gc_ConcurrentMarkSweep_cycles: 1
  jvm_gc_ConcurrentMarkSweep_msec: 15
  jvm_gc_ParNew_cycles: 24
  jvm_gc_ParNew_msec: 191
  jvm_gc_cycles: 25
  jvm_gc_msec: 206
gauges:
  Searchbird/connections: 1
  Searchbird/pending: 0
  jvm_fd_count: 135
  jvm_fd_limit: 10240
  jvm_heap_committed: 85000192
  jvm_heap_max: 530186240
  jvm_heap_used: 54778640
  jvm_nonheap_committed: 89657344
  jvm_nonheap_max: 136314880
  jvm_nonheap_used: 66238144
  jvm_num_cpus: 4
  jvm_post_gc_CMS_Old_Gen_used: 36490088
  jvm_post_gc_CMS_Perm_Gen_used: 54718880
  jvm_post_gc_Par_Eden_Space_used: 0
  jvm_post_gc_Par_Survivor_Space_used: 1315280
  jvm_post_gc_used: 92524248
  jvm_start_time: 1345072684280
  jvm_thread_count: 16
  jvm_thread_daemon_count: 7
  jvm_thread_peak_count: 16
  jvm_uptime: 1671792
labels:
metrics:
  Searchbird/handletime_us: (average=9598, count=4, maximum=19138, minimum=637, p25=637, p50=4265, p75=14175, p90=19138, p95=19138, p99=19138, p999=19138, p9999=19138, sum=38393)
  Searchbird/request_latency_ms: (average=4, count=3, maximum=9, minimum=0, p25=0, p50=5, p75=9, p90=9, p95=9, p99=9, p999=9, p9999=9, sum=14)

В дополнение к нашей статистике сервиса, мы также получаем общую статистику JVM, которая часто полезна.

Теперь давайте заглянем в код, который реализует конфигурацию, сервер и клиент.

…/config/SearchbirdServiceConfig.scala

Конфигурация является Scala трейтом, который имеет метод apply: RuntimeEnvironment => T для некоторых T, которые мы хотим создать. В этом смысле конфигурации – это “фабрики”. Во время выполнения файла конфигурации выполняется как скрипт (с помощью компилятора Scala в качестве библиотеки), и ожидается, что будет получен объект конфигурации.RuntimeEnvironment является объектом, запрашивающим различные параметры во время выполнения (флаги командной строки, версия JVM, временные метки сборки и т.д.).

SearchbirdServiceConfig определяет такой же класс. Он определяет параметры конфигурации вместе с их значениями по умолчанию. (Finagle поддерживает общие системы отслеживания, которые мы можем игнорировать для целей настоящего учебника; Zipkin распределенная система отслеживания, является коллектором/агрегатором таких путей следования.)

class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
  var thriftPort: Int = 9999
  var tracerFactory: Tracer.Factory = NullTracer.factory

  def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this)
}

В нашем случае, мы хотим создать SearchbirdService.ThriftServer. Он является серверным типом, который создается thrift-ым генератором кода 2.

…/Main.scala

При наборе “run” в sbt консоли вызывается main, который настраивает и создает экземпляр этого сервера. Считывается конфигурация (указанная в development.scala и используется как аргумент внутри “run”), создает SearchbirdService.ThriftServer и запускает его. RuntimeEnvironment.loadRuntimeConfig выполняет файл конфигурации и вызывает метод apply, передавая себя в качестве аргумента3.

object Main {
  private val log = Logger.get(getClass)

  def main(args: Array[String]) {
    val runtime = RuntimeEnvironment(this, args)
    val server = runtime.loadRuntimeConfig[SearchbirdService.ThriftServer]
    try {
      log.info("Starting SearchbirdService")
      server.start()
    } catch {
      case e: Exception =>
        log.error(e, "Failed starting SearchbirdService, exiting")
        ServiceTracker.shutdown()
        System.exit(1)
    }
  }
}
…/SearchbirdServiceImpl.scala

Это основа сервиса: мы расширяем SearchbirdService.ThriftServer с помощью нашей собственной реализации. Напомним, что SearchbirdService.ThriftServer был создан для нас thrift-ым генератором кода. Он генерирует метод Scala используя thrift метод. В нашем примере, сгенерированный интерфейс выглядит так:

trait SearchbirdService {
  def put(key: String, value: String): Future[Void]
  def get(key: String): Future[String]
}

Future[Value] возвращаются вместо непосредственных значений, так что их вычисление может быть отложено (finagle документация имеет более подробную информацию о Future). Для целей настоящего учебника, единственное, что вам нужно знать о Future, то что вы можете получить его значение с помощью get().

Стандартная реализация хранилища ключ-значение предоставляемая scala-bootstrapper очень проста: она обеспечивает структуру данных database и вызовы get и put, которые имеют доступ к этой структуре данных.

class SearchbirdServiceImpl(config: SearchbirdServiceConfig) extends SearchbirdService.ThriftServer {
  val serverName = "Searchbird"
  val thriftPort = config.thriftPort
  override val tracerFactory = config.tracerFactory

  val database = new mutable.HashMap[String, String]()

  def get(key: String) = {
    database.get(key) match {
      case None =>
        log.debug("get %s: miss", key)
        Future.exception(SearchbirdException("No such key"))
      case Some(value) =>
        log.debug("get %s: hit", key)
        Future(value)
    }
  }

  def put(key: String, value: String) = {
    log.debug("put %s", key)
    database(key) = value
    Future.Unit
  }

  def shutdown() = {
    super.shutdown(0.seconds)
  }
}

Результатом является простой thrift интерфейс для Scala HashMap.

Простой поисковый движок

Теперь мы расширим наш пример, чтобы создать простой поисковой движок. Затем мы расширим его так, чтобы он стал распределенным поисковым движком, состоящим из нескольких распределенных частей, так что мы сможем вместить данных больше, чем может поместиться в памяти одной машины.

Для простоты, мы будем расширять наш нынешний сервис thrift по частям, реализуя поддержку операции поиска. Моделью использования является put документы в поисковую систему, где каждый документ содержит ряд лексем (слов), тогда мы можем искать в строке лексем, чтобы вернуть все документы, которые содержат все лексемы в наборе. Архитектура совпадает с предыдущим примером, но для добавления вызываем search.

Searchbird implementation, revision 2

Чтобы реализовать такой поисковый движок, сделайте следующие изменения в двух файлах:

src/main/thrift/searchbird.thrift
service SearchbirdService {
  string get(1: string key) throws(1: SearchbirdException ex)

  void put(1: string key, 2: string value)

  list<string> search(1: string query)
}

Мы добавили метод search, который ищет в текущей хеш-таблице, возвращая список ключей, значения которых соответствуют запросу. Реализация также проста:

…/SearchbirdServiceImpl.scala

Большинство наших изменения происходят в этом файле.

В настоящее время database hashmap содержит начальный индекс, который включает ключ на документ. Мы переименуем его в forward и добавим вторую карту для обратного индекса (который отображает лексему на набор документов, которые содержат эту лексему). Таким образом, внутри SearchbirdServiceImpl.scala, заменим определение database с помощью:

val forward = new mutable.HashMap[String, String]
  with mutable.SynchronizedMap[String, String]
val reverse = new mutable.HashMap[String, Set[String]]
  with mutable.SynchronizedMap[String, Set[String]]

Внутри вызова get, заменим database с помощью forward, иначе get останется тем же (он выполняется перед поиском). Тем не менее put требует изменений: мы также должны заполнить обратный индекс для каждой лексемы в документе путем добавления ключа документа в список, связанный с этой лексемой. Замените вызов put с помощью следующего кода. Учитывая особый знак поиска, теперь мы можем использовать reverse карту, чтобы просмотреть документы.

def put(key: String, value: String) = {
  log.debug("put %s", key)

  forward(key) = value

  // serialize updaters
  synchronized {
    value.split(" ").toSet foreach { token =>
      val current = reverse.getOrElse(token, Set())
      reverse(token) = current + key
    }
  }

  Future.Unit
}

Заметим, что (хотя HashMap является потокобезопасным) только один поток может обновлять reverse карту за раз, чтобы убедиться, что чтение-модификация-запись конкретной записи – это атомарные операции. (Код является чрезмерно консервативным, он блокирует всю карту, вместо блокировки каждой отдельной операции получение-модификация-запись). Также обратите внимание на использование Set как структуры данных, это гарантирует, что если похожая лексема появляется дважды в документе, то она будет обработана только один раз с помощью foreach.

Реализация имеет проблему, которая остается в качестве упражнения для читателя: когда мы перезаписываем ключ с новым документом, мы не удаляем все ссылки на старый документ в обратном индексе.

Вернемся к поисковой системе: используем новый метод search. Он должен разметить свой ​​запрос, просмотреть все соответствующие документы, а затем находит общее в этих списках. После этого получаем список документов, которые содержат все лексемы в запросе. Все это просто выразить в Scala, добавив это к классу SearchbirdServiceImpl:

def search(query: String) = Future.value {
  val tokens = query.split(" ")
  val hits = tokens map { token => reverse.getOrElse(token, Set()) }
  val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
  intersected.toList
}

Несколько вещей, которые важны в этом небольшом фрагменте кода. При построении списка, если ключ (token) не найден, getOrElse будет отдавать значение второго параметра (в данном случае, пустой Set). Мы выполняем пересечение, используя left-reduce. Специфический способ, reduceLeftOption, не будет пытаться выполнить reduce, если hits пуст, возвращая вместо этого None. Он позволяет указать значение по умолчанию, вместо исключения. На самом деле, это эквивалентно:

def search(query: String) = Future.value {
  val tokens = query.split(" ")
  val hits = tokens map { token => reverse.getOrElse(token, Set()) }
  if (hits.isEmpty)
    Nil
  else
    hits reduceLeft { _ & _ } toList
}

Какой вариант использовать это в основном дело вкуса, хотя функциональный стиль часто избегает условные конструкции для разумных значений по умолчанию.

Теперь мы можем экспериментировать с нашей новой реализацией с помощью консоли. Снова запустите сервер:

$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala

и потом из директории searchbird, запустите клиент:

$ ./console 127.0.0.1 9999
...
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.

finagle-client>

Вставьте следующее описание лекции в консоль:

client.put("basics", " values functions classes methods inheritance try catch finally expression oriented")
client.put("basics", " case classes objects packages apply update functions are objects (uniform access principle) pattern")
client.put("collections", " lists maps functional combinators (map foreach filter zip")
client.put("pattern", " more functions! partialfunctions more pattern")
client.put("type", " basic types and type polymorphism type inference variance bounds")
client.put("advanced", " advanced types view bounds higher kinded types recursive types structural")
client.put("simple", " all about sbt the standard scala build")
client.put("more", " tour of the scala collections")
client.put("testing", " write tests with specs a bdd testing framework for")
client.put("concurrency", " runnable callable threads futures twitter")
client.put("java", " java interop using scala from")
client.put("searchbird", " building a distributed search engine using")

Теперь мы можем выполнить несколько запросов, которые возвращают ключи документов, которые содержат условия поиска.

> Await.result(client.search("functions"))
res12: Seq[String] = ArrayBuffer(basics)

> Await.result(client.search("java"))
res13: Seq[String] = ArrayBuffer(java)

> Await.result(client.search("java scala"))
res14: Seq[String] = ArrayBuffer(java)

> Await.result(client.search("functional"))
res15: Seq[String] = ArrayBuffer(collections)

> Await.result(client.search("sbt"))
res16: Seq[String] = ArrayBuffer(simple)

> Await.result(client.search("types"))
res17: Seq[String] = ArrayBuffer(type, advanced)

Напомним, что если вызов возвращает Future, мы должны использовать блокировку вызова get() для получения значения, содержащегося в этом future. Мы можем использовать команду Future.collect, чтобы сделать несколько одновременных запросов и подождать, пока все они завершатся успешно:

> import com.twitter.util.{Await, Future}
...
> Await.result(Future.collect(Seq(
    client.search("types"),
    client.search("sbt"),
    client.search("functional")
  )))
res18: Seq[Seq[String]] = ArrayBuffer(ArrayBuffer(type, advanced), ArrayBuffer(simple), ArrayBuffer(collections))

Делаем наш сервис распределенным

На одной машине, наши простой поисковый движок, находящийся в памяти, не сможет искать фрагменты больше, чем размер оперативной памяти. Мы будем теперь пытаться исправить эту ситуацию, распределяя узлы с помощью простой схемы распределения (шардинга). Вот блок-схема:

Distributed Searchbird service

Абстрагирование

В помощь нашему делу, мы введем другую абстаркцию -Index- для того чтобы отделить раелизацию индекса от SearchbirdService. Сделать это просто. Мы начнем с добавления файла Index в сборку (create the file searchbird/src/main/scala/com/twitter/searchbird/Index.scala):

…/Index.scala
package com.twitter.searchbird

import scala.collection.mutable
import com.twitter.util._
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.thrift.ThriftClientFramedCodec

trait Index {
  def get(key: String): Future[String]
  def put(key: String, value: String): Future[Unit]
  def search(key: String): Future[List[String]]
}

class ResidentIndex extends Index {
  val log = Logger.get(getClass)

  val forward = new mutable.HashMap[String, String]
    with mutable.SynchronizedMap[String, String]
  val reverse = new mutable.HashMap[String, Set[String]]
    with mutable.SynchronizedMap[String, Set[String]]

  def get(key: String) = {
    forward.get(key) match {
      case None =>
        log.debug("get %s: miss", key)
        Future.exception(SearchbirdException("No such key"))
      case Some(value) =>
        log.debug("get %s: hit", key)
        Future(value)
    }
  }

  def put(key: String, value: String) = {
    log.debug("put %s", key)

    forward(key) = value

    // admit only one updater.
    synchronized {
      (Set() ++ value.split(" ")) foreach { token =>
        val current = reverse.get(token) getOrElse Set()
        reverse(token) = current + key
      }
    }

    Future.Unit
  }

  def search(query: String) = Future.value {
    val tokens = query.split(" ")
    val hits = tokens map { token => reverse.getOrElse(token, Set()) }
    val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
    intersected.toList
  }
}

Сейчас мы превратили наш thrift сервис в простой механизм диспетчеризации: он обеспечивает thrift интерфейс для любого экземпляра Index. Это мощная абстракция, потому что она отделяет реализацию сервиса от реализации индекса. Сервис не должен знать никаких подробностей об основных индексах, индекс может быть локальным или удаленным, или может быть составной частью многих удаленных индексов, но сервис не заботится об этом, и реализация индекса может изменяться без изменения сервиса.

Замените определение класса SearchbirdServiceImpl тем, что ниже (которая уже не содержит подробностей реализации индекса и гораздо проще). Обратите внимание, инициализация сервера теперь принимает второй аргумент Index.

…/SearchbirdServiceImpl.scala
class SearchbirdServiceImpl(config: SearchbirdServiceConfig, index: Index) extends SearchbirdService.ThriftServer {
  val serverName = "Searchbird"
  val thriftPort = config.thriftPort

  def get(key: String) = index.get(key)
  def put(key: String, value: String) =
    index.put(key, value) flatMap { _ => Future.Unit }
  def search(query: String) = index.search(query)

  def shutdown() = {
    super.shutdown(0.seconds)
  }
}
…/config/SearchbirdServiceConfig.scala

Обновите ваш вызов apply в SearchbirdServiceConfig соответственно:

class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
  var thriftPort: Int = 9999
  var tracerFactory: Tracer.Factory = NullTracer.factory

  def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this, new ResidentIndex)
}

Мы создали нашу простую распределенную систему, в которой есть один главный узел, который координирует запросы к его дочерним узлам. Для того чтобы добиться этого, нам нужно два новых типа Index. Один представляет собой удаленный индекс, другой сводный индекс нескольких других экземпляров Index. Таким образом, мы можем построить распределенный индекс, определяя экземпляр сводного индекса для удаленных индексов. Заметим, что оба индекса типы Index имеют одинаковый интерфейс, так что серверам не нужно знать, является ли индекс, к которому они подключаются удаленным или составным.

…/Index.scala

В Index.scala, определим CompositeIndex:

class CompositeIndex(indices: Seq[Index]) extends Index {
  require(!indices.isEmpty)

  def get(key: String) = {
    val queries = indices.map { idx =>
      idx.get(key) map { r => Some(r) } handle { case e => None }
    }

    Future.collect(queries) flatMap { results =>
      results.find { _.isDefined } map { _.get } match {
        case Some(v) => Future.value(v)
        case None => Future.exception(SearchbirdException("No such key"))
      }
    }
  }

  def put(key: String, value: String) =
    Future.exception(SearchbirdException("put() not supported by CompositeIndex"))

  def search(query: String) = {
    val queries = indices.map { _.search(query) rescue { case _=> Future.value(Nil) } }
    Future.collect(queries) map { results => (Set() ++ results.flatten) toList }
  }
}

Сводный индекс работает с набором базовых экземпляров Index. Обратите внимание, что он не заботится, как они на самом деле реализованы. Этот тип композиции обеспечивает большую гибкость в построении различных схем запросов. Мы не определяем схему распределения, и сводный индекс не поддерживает операции put. Они выполняются непосредственно дочерними узлами. get реализуется с помощью запросов всех наших дочерних узлов и выбирается первый успешный результат. Если таковых нет, то выбрасывается исключение. Отметим, что поскольку сообщается отсутствие значения, выбрасывая исключение, мы handle (управляем) этим с помощью Future, превращая любое исключение в занчение None. В реальной системе, мы бы, вероятно, имели соответствующие коды ошибок для отсутствующих значений, а не использовали исключения. Исключения являются удобными и целесообразными для создания прототипов, но в других случаях они не подходят. Для того, чтобы отличить реальное исключение и отсутствующее значение, мне нужно посмотреть само исключение. Скорее всего, это лучший способ для указания различий непосредственно в типе возвращаемого значения.

search работает похожим образом, как и раньше. Вместо того, чтобы взять первый результат, мы объединяем их, обеспечивая их уникальность с помощью конструкции Set.

RemoteIndex предоставляет интерфейс Index для удаленного сервера.

class RemoteIndex(hosts: String) extends Index {
  val transport = ClientBuilder()
    .name("remoteIndex")
    .hosts(hosts)
    .codec(ThriftClientFramedCodec())
    .hostConnectionLimit(1)
    .timeout(500.milliseconds)
    .build()
  val client = new SearchbirdService.FinagledClient(transport)

  def get(key: String) = client.get(key)
  def put(key: String, value: String) = client.put(key, value) map { _ => () }
  def search(query: String) = client.search(query) map { _.toList }
}

Это создает finagle thrift клиент с некоторыми подходящими значениями по умолчанию и прокси вызовами, незначительно меняя типы.

Собираем все вместе

Теперь у нас есть все, что нам нужно. Прежде нужно настроить конфигурацию для того, чтобы иметь возможность вызвать данный узел либо как главный, либо как узел разделения данных. Для того, чтобы сделать это, мы перечислим шарды (распределенные узлы) в нашей системе путем создания нового элемента конфигурации. Мы также должны добавить Index аргумент в наш экземпляр SearchbirdServiceImpl. Мы будем использовать аргументы командной строки (напомним, что Config имеет к ним доступ), чтобы запустить сервер в любом режиме.

…/config/SearchbirdServiceConfig.scala
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
  var thriftPort: Int = 9999
  var shards: Seq[String] = Seq()

  def apply(runtime: RuntimeEnvironment) = {
    val index = runtime.arguments.get("shard") match {
      case Some(arg) =>
        val which = arg.toInt
        if (which >= shards.size || which < 0)
          throw new Exception("invalid shard number %d".format(which))

        // override with the shard port
        val Array(_, port) = shards(which).split(":")
        thriftPort = port.toInt

        new ResidentIndex

      case None =>
        require(!shards.isEmpty)
        val remotes = shards map { new RemoteIndex(_) }
        new CompositeIndex(remotes)
    }

    new SearchbirdServiceImpl(this, index)
  }
}

Теперь будем настраивать свою конфигурацию: добавляем инициализацию “шардов” в экземпляр SearchbirdServiceConfig (мы можем общаться с шардом 0 через порт 9000, с шардом 1 через порт 9001, и так далее).

config/development.scala
new SearchbirdServiceConfig {
  // Add your own config here
  shards = Seq(
    "localhost:9000",
    "localhost:9001",
    "localhost:9002"
  )
  ...

Закомментируйте строку admin.httpPort (мы не хотим, чтобы были запущены одни и теже сервисы на данной машине, каждый из которых пытается открыть один и тот же порт):

  // admin.httpPort = 9900

Теперь, если мы вызываем наш сервер без каких-либо аргументов, он стартует как главный узел, который общается со всеми даннымы шардами. Если мы зададим аргумент с номером шарда, то сервер запускается на порту, который принадлежит этому шарду.

Давайте попробуем! Мы запустим 3 сервиса: 2 шарда и 1 главный узел. Во-первых компилируем изменения:

$ ./sbt
> compile
...
> exit

Потом запускаем 3 сервера:

$ ./sbt 'run -f config/development.scala -D shard=0'
$ ./sbt 'run -f config/development.scala -D shard=1'
$ ./sbt 'run -f config/development.scala'

Вы можете запустить их либо в 3 различных окнах, либо (в одном окне) каждый будет ждать своей очереди для запуска, нажмите Ctrl-Z, если вы хотите приостановить сервис, или используйте bg, чтобы запустить его в фоновом режиме.

Мы будем взаимодействовать с ним через консоль. Во-первых, давайте заполним данные на двух шардах. Запускаем из директории searchbird:

$ ./console localhost 9000
...
> client.put("fromShardA", "a value from SHARD_A")
> client.put("hello", "world")
$ ./console localhost 9001
...
> client.put("fromShardB", "a value from SHARD_B")
> client.put("hello", "world again")

Вы можете выйти из этих сеансов консоли, как только вы их завершите. Теперь сделаем запрос к нашей базе данных на главном узле (порт 9999):

$ ./console localhost 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient localhost 9999
'client' is bound to your thrift client.

finagle-client> Await.result(client.get("hello"))
res0: String = world

finagle-client> Await.result(client.get("fromShardC"))
SearchbirdException(No such key)
...

finagle-client> Await.result(client.get("fromShardA"))
res2: String = a value from SHARD_A

finagle-client> Await.result(client.search("hello"))
res3: Seq[String] = ArrayBuffer()

finagle-client> Await.result(client.search("world"))
res4: Seq[String] = ArrayBuffer(hello)

finagle-client> Await.result(client.search("value"))
res5: Seq[String] = ArrayBuffer(fromShardA, fromShardB)

Этот дизайн имеет несколько абстракций данных, которые позволяют использовать более модульную и масштабируемую реализацию:

Возможные улучшения этой реализации будут включать:

1 Локальный скрипт ./sbt гарантирует, что версия SBT не изменяет то, что мы знаем обо всех имеющися библиотеках.

2 В target/gen-scala/com/twitter/searchbird/SearchbirdService.scala.

3 Смотрите Ostrich’s “README”: https://github.com/twitter/ostrich/blob/master/README.md для получения дополнительной информации.