엘라스틱서치(ES) 구동과정 소스코드 살펴보기

개요

해당 글은 엘라스틱서치(이하 ES)가 어떻게 구동이 되는지 소스코드 레벨에서 따라가 보는것을 목적으로 합니다. ES 소스에 대한 궁금증을 해결하고 오픈소스 코드를 분석을 하여 실력을 늘리는데 목적이 있습니다.

아래 포스팅을

살펴볼 내용

구체적으로 살펴볼 내용은 아래와 같습니다.

  • ES가 기동되는 코드를 순차적으로 따라가봅니다.

  • ES가 구동되는 내용이 매우 방대하기 때문에 ES가 구동되어 curl http://localhost:9200/ 호출시 서비스가 어떻게 동작 되는지 확인해 봅니다. 그림. ES호출

살펴보는 목적

  • ES 기동 순서에 따라 코드를 따라가보고 이해합니다.

  • REST 호출시 ES 동작 되는 코드를 한번 따라가면 응용하여 다른 코드 분석에 용이합니다. (예를들어 문서 조회, 색인, 검색 등의 코드를 따라갈 때 도움이 됩니다.)

  • REST 호출 로직 중에 "curl http://localhost:9200/"가 가장 간단하여 이부분을 살펴봅니다.

바쁘신 분들을 위해 결론요약 #1

아래 코드를 보고 싶은 분도 있겠지만 이 글의 핵심만 읽고 싶으시면 아래의 내용만 이해하시면 됩니다.

  • ES 구동과정의 핵심은 Node.java 생성자와 start() 메서드이다.

  • ES 소스를 보는 간단한 방법은 org.elasticsearch.rest.action 패키지의 RestxxxAction 소스를 기능별로 하나씩 따라가보는 것이다. (xxx는 *패턴을 뜻함)

  • 이 포스팅 예제를 예로들면 RestMainAction 부터 보면 된다.

  • 기능별로 따라가보는 방법을 더 예를 들자면 아래와 같다.

    • 색인 : RestIndexAction

    • 조회 : RestGetAction

    • 검색 : RestSearchAction

바쁘신 분들을 위해 결론요약 #2(그림)

글에서 설명하고자 하는 ES 구동과정 및 단순 호출과정은 아래와 같이 처리됩니다.

다만, 본 포스팅 이해를 돕기위해 많은 부분이 생략되어 있음을 주의해야 합니다. (다만, 실제 동작과정에서 잘못 기술된 부분은 없습니다.)

  • es 구동과정

  • "curl localhost:9200" 호출 시

설명방식

  • 글은 IDE에서 시작점(Entry Point)부터 따라가본 소스파일을 순서대로 기재하고 필요하다고 생각하는 커멘트를 추가합니다.

  • 글의 길이를 효율적으로 유지하기 위해 소스코드의 패키지명, import, 불필요 메서드는 삭제했습니다.

  • 메서드 argument가 있음에도 불구하고 편의상 메서드명() 으로 표현합니다.

  • ES 소스의 흐름에 대한 설명이 포스팅의 주 목적이므로 모든 코드에 대한 세세한 설명은 하지 않습니다.

  • 클래스명, 메서드명은 굵게 표시합니다.

소스 분석 시작 합니다.

Elasticsearch.java

ES를 실행하는 시작점은 꽤나 직관적이다. 자바 개발자라면 익숙할 main 메서드를 통해 어플리케이션을 실행하기 때문이다.

아래 코드의 첫번째 메인 메서드를 통해 2번째 메인메서드를 호출한다.

elasticsearch.main(args, terminal) 메서드를 실행하면 SettingCommand의 부모 클래스인 Command 클래스의 main(String[] args, Terminal terminal)가 호출된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Elasticsearch extends SettingCommand {
  public static void main(final String[] args) throws Exception {
      // we want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
      // presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy)
      System.setSecurityManager(new SecurityManager() {
          @Override
          public void checkPermission(Permission perm) {
              // grant all permissions so that we can later set the security manager to the one that we want
          }
      });
      final Elasticsearch elasticsearch = new Elasticsearch();
      int status = main(args, elasticsearch, Terminal.DEFAULT);
      if (status != ExitCodes.OK) {
          exit(status);
      }
  }
  static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
      return elasticsearch.main(args, terminal);
  }
}

Command.java

main메서드의 주된 기능은 별 것 없이 mainWithoutErrorHandling(args, terminal)를 호출하는 부분이다.

mainWithoutErrorHandling 메서드에서는 인자를 파싱하여 옵션값들을 만들고 execute(terminal, options)를 호출한다. 이 메서드는 자식 클래스이자 Elasticsearch.java의 부모 클래스인 SettingCommand.java에 존재한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public abstract class Command {
  /** Parses options for this command from args and executes it. */
    public final int main(String[] args, Terminal terminal) throws Exception {
        // initialize default for es.logger.level because we will not read the log4j2.properties
        final String loggerLevel = System.getProperty("es.logger.level", Level.INFO.name());
        final Settings settings = Settings.builder().put("logger.level", loggerLevel).build();
        LogConfigurator.configureWithoutConfig(settings);
        try {
            mainWithoutErrorHandling(args, terminal);
        } catch (OptionException e) {
            printHelp(terminal);
            terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            return ExitCodes.USAGE;
        } catch (UserException e) {
            if (e.exitCode == ExitCodes.USAGE) {
                printHelp(terminal);
            }
            terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            return e.exitCode;
        }
        return ExitCodes.OK;
    }
    /**
     * Executes the command, but all errors are thrown.
     */
    void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
        final OptionSet options = parser.parse(args);
        if (options.has(helpOption)) {
            printHelp(terminal);
            return;
        }
        if (options.has(silentOption)) {
            terminal.setVerbosity(Terminal.Verbosity.SILENT);
        } else if (options.has(verboseOption)) {
            terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
        } else {
            terminal.setVerbosity(Terminal.Verbosity.NORMAL);
        }
        execute(terminal, options);
    }
}

SettingCommand.java

주요 로직은 options값으로 settings를 만들어 execute(terminal, options, settings)를 호출하는데 이 메서드는 추상화 메서드로 결국 Elasticsearch.execute(terminal, options, settings) 라고 볼 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public abstract class SettingCommand extends Command {
    @Override
    protected void execute(Terminal terminal, OptionSet options) throws Exception {
        final Map<String, String> settings = new HashMap<>();
        for (final KeyValuePair kvp : settingOption.values(options)) {
            if (kvp.value.isEmpty()) {
                throw new UserException(ExitCodes.USAGE, "Setting [" + kvp.key + "] must not be empty");
            }
            settings.put(kvp.key, kvp.value);
        }
        putSystemPropertyIfSettingIsMissing(settings, "path.conf""es.path.conf");
        putSystemPropertyIfSettingIsMissing(settings, "path.data""es.path.data");
        putSystemPropertyIfSettingIsMissing(settings, "path.home""es.path.home");
        putSystemPropertyIfSettingIsMissing(settings, "path.logs""es.path.logs");
        execute(terminal, options, settings);
    }
}

Elasticsearch.java

다시 Elasticsearch.java로 돌아와 메서드를 살펴본다.

옵션 및 버전 유효성 체크를 수행한 뒤 init(daemonize, pidFile, quiet, settings)를 호출한다.

init 메서드에서는 Bootstrap.init(!daemonize, pidFile, quiet, esSettings)를 호출하여 ES 노드 부팅을 시작한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Elasticsearch extends SettingCommand {
    @Override
    protected void execute(Terminal terminal, OptionSet options, Map<String, String> settings) throws UserException {
        if (options.nonOptionArguments().isEmpty() == false) {
            throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
        }
        if (options.has(versionOption)) {
            if (options.has(daemonizeOption) || options.has(pidfileOption)) {
                throw new UserException(ExitCodes.USAGE, "Elasticsearch version option is mutually exclusive with any other option");
            }
            terminal.println("Version: " + org.elasticsearch.Version.CURRENT
                    + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date()
                    + ", JVM: " + JvmInfo.jvmInfo().version());
            return;
        }
        final boolean daemonize = options.has(daemonizeOption);
        final Path pidFile = pidfileOption.value(options);
        final boolean quiet = options.has(quietOption);
        try {
            init(daemonize, pidFile, quiet, settings);
        } catch (NodeValidationException e) {
            throw new UserException(ExitCodes.CONFIG, e.getMessage());
        }
    }
    void init(final boolean daemonize, final Path pidFile, final boolean quiet, final Map<String, String> esSettings)
        throws NodeValidationException, UserException {
        try {
            Bootstrap.init(!daemonize, pidFile, quiet, esSettings);
        } catch (BootstrapException | RuntimeException e) {
            // format exceptions to the console in a special way
            // to avoid 2MB stacktraces from guice, etc.
            throw new StartupException(e);
        }
    }
}

Bootstrap.java

파라미터로 넘어온 settings 맵으로 Environment를 초기화 한다.

이후 INSTANCE.start(), 즉 Bootstrap의 start()를 호출 하는 것이 주요 로직이다.

start() 부분을 살펴보면 Node를 실행하고 keepAliveThread를 실행하는 것을 알 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
final class Bootstrap {
  private static volatile Bootstrap INSTANCE;
  private volatile Node node;
  /** creates a new instance */
  Bootstrap() {
      keepAliveThread = new Thread(new Runnable() {
          @Override
          public void run() {
              try {
                  keepAliveLatch.await();
              } catch (InterruptedException e) {
                  // bail out
              }
          }
      }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
      keepAliveThread.setDaemon(false);
      // keep this thread alive (non daemon thread) until we shutdown
      Runtime.getRuntime().addShutdownHook(new Thread() {
          @Override
          public void run() {
              keepAliveLatch.countDown();
          }
      });
  }
  /**
   * This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
   */
  static void init(
          final boolean foreground,
          final Path pidFile,
          final boolean quiet,
          final Map<String, String> esSettings) throws BootstrapException, NodeValidationException, UserException {
      // Set the system property before anything has a chance to trigger its use
      initLoggerPrefix();
      // force the class initializer for BootstrapInfo to run before
      // the security manager is installed
      BootstrapInfo.init();
      INSTANCE = new Bootstrap();
      Environment environment = initialEnvironment(foreground, pidFile, esSettings);
      try {
          LogConfigurator.configure(environment);
      } catch (IOException e) {
          throw new BootstrapException(e);
      }
      checkForCustomConfFile();
      if (environment.pidFile() != null) {
          try {
              PidFile.create(environment.pidFile(), true);
          } catch (IOException e) {
              throw new BootstrapException(e);
          }
      }
      final boolean closeStandardStreams = (foreground == false) || quiet;
      try {
          if (closeStandardStreams) {
              final Logger rootLogger = ESLoggerFactory.getRootLogger();
              final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
              if (maybeConsoleAppender != null) {
                  Loggers.removeAppender(rootLogger, maybeConsoleAppender);
              }
              closeSystOut();
          }
          // fail if somebody replaced the lucene jars
          checkLucene();
          // install the default uncaught exception handler; must be done before security is
          // initialized as we do not want to grant the runtime permission
          // setDefaultUncaughtExceptionHandler
          Thread.setDefaultUncaughtExceptionHandler(
              new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));
          INSTANCE.setup(true, environment);
          INSTANCE.start();
          if (closeStandardStreams) {
              closeSysError();
          }
      } catch (NodeValidationException | RuntimeException e) {
          // disable console logging, so user does not see the exception twice (jvm will show it already)
          final Logger rootLogger = ESLoggerFactory.getRootLogger();
          final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
          if (foreground && maybeConsoleAppender != null) {
              Loggers.removeAppender(rootLogger, maybeConsoleAppender);
          }
          Logger logger = Loggers.getLogger(Bootstrap.class);
          if (INSTANCE.node != null) {
              logger = Loggers.getLogger(Bootstrap.class, Node.NODE_NAME_SETTING.get(INSTANCE.node.settings()));
          }
          // HACK, it sucks to do this, but we will run users out of disk space otherwise
          if (e instanceof CreationException) {
              // guice: log the shortened exc to the log file
              ByteArrayOutputStream os = new ByteArrayOutputStream();
              PrintStream ps = null;
              try {
                  ps = new PrintStream(os, false"UTF-8");
              } catch (UnsupportedEncodingException uee) {
                  assert false;
                  e.addSuppressed(uee);
              }
              new StartupException(e).printStackTrace(ps);
              ps.flush();
              try {
                  logger.error("Guice Exception: {}", os.toString("UTF-8"));
              } catch (UnsupportedEncodingException uee) {
                  assert false;
                  e.addSuppressed(uee);
              }
          } else if (e instanceof NodeValidationException) {
              logger.error("node validation exception\n{}", e.getMessage());
          } else {
              // full exception
              logger.error("Exception", e);
          }
          // re-enable it if appropriate, so they can see any logging during the shutdown process
          if (foreground && maybeConsoleAppender != null) {
              Loggers.addAppender(rootLogger, maybeConsoleAppender);
          }
          throw e;
      }
  }
  private void start() throws NodeValidationException {
    node.start();
    keepAliveThread.start();
  }
}

Node.java

Node.java는 생성자 부분부터 볼 필요가 있다. 참고로, 생성자 부분부터는 구글주스 기본지식이 있으면 코드를 이해하는데 용이하다. 생성자 코드에서 수행하는 주요 로직은 아래와 같다.

  • nodeEnvironment.nodeId()를 통해 유니크한 노드 ID를 발급받는다.

  • NODE_NAME_SETTING.get(tmpSettings)를 통해 노드 이름을 획득한다.

  • new NodeClient(settings, threadPool)로 노드가 사용할 client 인스턴스를 생성한다.

  • 구글 주스 module 등록을 위해 ModulesBuilder를 만들어 AbstractModule을 상속한 각종 모듈들을 추가 후 injector를 생성한다. 추가되는 각 모듈들은 configure() 메서드를 통해 의존성 있는 다른 클래스들을 bind 한다.

  • 일부 구글 주스의 AbstractModule 형태로 등록되지 않은 모듈들은 추가로 bind를 수행한다.

  • 우리가 주목할 부분은 NetworkModule로부터 HttpServerTransport를 획득하고 HttpServer를 생성할 때 전달한다.

start() 메서드는 노드를 실행하는 역할을 하고 주요 로직은 아래와 같다.

  • lifecycle을 체크하여 만약 이미 실행이 되었다면 아무런 기능도 수행하지 않는다.

  • 생성자에서 불러온 각종 모듈 및 binding 된 서비스를 start()한다. (예:IndicesService, SearchService, RestController 등)

  • injector.getInstance(HttpServer.class).start()로 HTTP 서버를 실행한다.

우리는 HttpServer가 뜨는 과정이 해당글의 목적이므로 HttpSever의 start()부분을 보기로 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
public class Node implements Closeable {
  private final Injector injector;
  protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
      final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
      boolean success = false;
      {
          // use temp logger just to say we are starting. we can't use it later on because the node name might not be set
          Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
          logger.info("initializing ...");
      }
      try {
          Settings tmpSettings = Settings.builder().put(environment.settings())
              .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
          tmpSettings = TribeService.processSettings(tmpSettings);
          // create the node environment as soon as possible, to recover the node id and enable logging
          try {
              nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
              resourcesToClose.add(nodeEnvironment);
          } catch (IOException ex) {
              throw new IllegalStateException("Failed to created node environment", ex);
          }
          final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
          Logger logger = Loggers.getLogger(Node.class, tmpSettings);
          final String nodeId = nodeEnvironment.nodeId();
          tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
          // this must be captured after the node name is possibly added to the settings
          final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
          if (hadPredefinedNodeName == false) {
              logger.info("node name [{}] derived from node ID [{}]; set [{}] to override", nodeName, nodeId, NODE_NAME_SETTING.getKey());
          } else {
              logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
          }
          final JvmInfo jvmInfo = JvmInfo.jvmInfo();
          logger.info(
              "version[{}], pid[{}], build[{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
              displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
              jvmInfo.pid(),
              Build.CURRENT.shortHash(),
              Build.CURRENT.date(),
              Constants.OS_NAME,
              Constants.OS_VERSION,
              Constants.OS_ARCH,
              Constants.JVM_VENDOR,
              Constants.JVM_NAME,
              Constants.JAVA_VERSION,
              Constants.JVM_VERSION);
          warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
          if (logger.isDebugEnabled()) {
              logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
                  environment.configFile(), Arrays.toString(environment.dataFiles()), environment.logsFile(), environment.pluginsFile());
          }
          // TODO: Remove this in Elasticsearch 6.0.0
          if (JsonXContent.unquotedFieldNamesSet) {
              DeprecationLogger dLogger = new DeprecationLogger(logger);
              dLogger.deprecated("[{}] has been set, but will be removed in Elasticsearch 6.0.0",
                  JsonXContent.JSON_ALLOW_UNQUOTED_FIELD_NAMES);
          }
          this.pluginsService = new PluginsService(tmpSettings, environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
          this.settings = pluginsService.updatedSettings();
          // create the environment based on the finalized (processed) view of the settings
          // this is just to makes sure that people get the same settings, no matter where they ask them from
          this.environment = new Environment(this.settings);
          Environment.assertEquivalent(environment, this.environment);
          final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
          final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
          resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
          // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
          DeprecationLogger.setThreadContext(threadPool.getThreadContext());
          resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
          final List<Setting<?>> additionalSettings = new ArrayList<>();
          final List<String> additionalSettingsFilter = new ArrayList<>();
          additionalSettings.addAll(pluginsService.getPluginSettings());
          additionalSettingsFilter.addAll(pluginsService.getPluginSettingsFilter());
          for (final ExecutorBuilder<?> builder : threadPool.builders()) {
              additionalSettings.addAll(builder.getRegisteredSettings());
          }
          client = new NodeClient(settings, threadPool);
          final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
          final ScriptModule scriptModule = ScriptModule.create(settings, this.environment, resourceWatcherService,
              pluginsService.filterPlugins(ScriptPlugin.class));
          AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
          additionalSettings.addAll(scriptModule.getSettings());
          // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
          // so we might be late here already
          final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
          scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
          resourcesToClose.add(resourceWatcherService);
          final NetworkService networkService = new NetworkService(settings,
              getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
          final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
          clusterService.add(scriptModule.getScriptService());
          resourcesToClose.add(clusterService);
          final TribeService tribeService = new TribeService(settings, clusterService, nodeId,
              s -> newTribeClientNode(s, classpathPlugins));
          resourcesToClose.add(tribeService);
          final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
              scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
          final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
          ModulesBuilder modules = new ModulesBuilder();
          // plugin modules must be added here, before others or we can get crazy injection errors...
          for (Module pluginModule : pluginsService.createGuiceModules()) {
              modules.add(pluginModule);
          }
          final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
          modules.add(new NodeModule(this, monitorService));
          ClusterModule clusterModule = new ClusterModule(settings, clusterService,
              pluginsService.filterPlugins(ClusterPlugin.class));
          modules.add(clusterModule);
          IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
          modules.add(indicesModule);
          SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
          ActionModule actionModule = new ActionModule(DiscoveryNode.isIngestNode(settings), false, settings,
              clusterModule.getIndexNameExpressionResolver(), settingsModule.getClusterSettings(),
              pluginsService.filterPlugins(ActionPlugin.class));
          modules.add(actionModule);
          modules.add(new GatewayModule());
          modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class)));
          CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
              settingsModule.getClusterSettings());
          resourcesToClose.add(circuitBreakerService);
          BigArrays bigArrays = createBigArrays(settings, circuitBreakerService);
          resourcesToClose.add(bigArrays);
          modules.add(settingsModule);
          List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
              NetworkModule.getNamedWriteables().stream(),
              indicesModule.getNamedWriteables().stream(),
              searchModule.getNamedWriteables().stream(),
              pluginsService.filterPlugins(Plugin.class).stream()
                  .flatMap(p -> p.getNamedWriteables().stream()))
              .flatMap(Function.identity()).collect(Collectors.toList());
          final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
          final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment);
          final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment,
              settingsModule.getClusterSettings(), analysisModule.getAnalysisRegistry(), searchModule.getQueryParserRegistry(),
              clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
              threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
              clusterService, client, metaStateService);
          Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
              .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
                                               scriptModule.getScriptService(), searchModule.getSearchRequestParsers()).stream())
              .collect(Collectors.toList());
          Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
              pluginsService.filterPlugins(Plugin.class).stream()
              .map(Plugin::getCustomMetaDataUpgrader)
              .collect(Collectors.toList());
          final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
              bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
          final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders);
          final Transport transport = networkModule.getTransportSupplier().get();
          final TransportService transportService = newTransportService(settings, transport, threadPool,
              networkModule.getTransportInterceptor(), settingsModule.getClusterSettings());
          final Consumer<Binder> httpBind;
          if (networkModule.isHttpEnabled()) {
              HttpServerTransport httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
              HttpServer httpServer = new HttpServer(settings, httpServerTransport, actionModule.getRestController(), client,
                  circuitBreakerService);
              httpBind = b -> {
                  b.bind(HttpServer.class).toInstance(httpServer);
                  b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
              };
          } else {
              httpBind = b -> {
                  b.bind(HttpServer.class).toProvider(Providers.of(null));
              };
          }
          final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService,
              networkService, clusterService, pluginsService.filterPlugins(DiscoveryPlugin.class));
          modules.add(b -> {
                  b.bind(IndicesQueriesRegistry.class).toInstance(searchModule.getQueryParserRegistry());
                  b.bind(SearchRequestParsers.class).toInstance(searchModule.getSearchRequestParsers());
                  b.bind(SearchExtRegistry.class).toInstance(searchModule.getSearchExtRegistry());
                  b.bind(PluginsService.class).toInstance(pluginsService);
                  b.bind(Client.class).toInstance(client);
                  b.bind(NodeClient.class).toInstance(client);
                  b.bind(Environment.class).toInstance(this.environment);
                  b.bind(ThreadPool.class).toInstance(threadPool);
                  b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
                  b.bind(TribeService.class).toInstance(tribeService);
                  b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
                  b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
                  b.bind(BigArrays.class).toInstance(bigArrays);
                  b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
                  b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
                  b.bind(IngestService.class).toInstance(ingestService);
                  b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
                  b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
                  b.bind(MetaStateService.class).toInstance(metaStateService);
                  b.bind(IndicesService.class).toInstance(indicesService);
                  b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
                      threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase()));
                  b.bind(Transport.class).toInstance(transport);
                  b.bind(TransportService.class).toInstance(transportService);
                  b.bind(NetworkService.class).toInstance(networkService);
                  b.bind(AllocationCommandRegistry.class).toInstance(NetworkModule.getAllocationCommandRegistry());
                  b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
                  b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings,
                      indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings()));
                  b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
                  b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
                  {
                      RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
                      processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
                      b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
                              indicesService, recoverySettings, clusterService));
                      b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,
                              transportService, recoverySettings, clusterService));
                  }
                  httpBind.accept(b);
                  pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
              }
          );
          injector = modules.createInjector();
          List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
              .filter(p -> p instanceof LifecycleComponent)
              .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
          pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
              .map(injector::getInstance).collect(Collectors.toList()));
          resourcesToClose.addAll(pluginLifecycleComponents);
          this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
          client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}));
          logger.info("initialized");
          success = true;
      } catch (IOException ex) {
          throw new ElasticsearchException("failed to bind service", ex);
      } finally {
          if (!success) {
              IOUtils.closeWhileHandlingException(resourcesToClose);
          }
      }
  }
  /**
   * Start the node. If the node is already started, this method is no-op.
   */
  public Node start() throws NodeValidationException {
      if (!lifecycle.moveToStarted()) {
          return this;
      }
      Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
      logger.info("starting ...");
      // hack around dependency injection problem (for now...)
      injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
      pluginLifecycleComponents.forEach(LifecycleComponent::start);
      injector.getInstance(MappingUpdatedAction.class).setClient(client);
      injector.getInstance(IndicesService.class).start();
      injector.getInstance(IndicesClusterStateService.class).start();
      injector.getInstance(IndicesTTLService.class).start();
      injector.getInstance(SnapshotsService.class).start();
      injector.getInstance(SnapshotShardsService.class).start();
      injector.getInstance(RoutingService.class).start();
      injector.getInstance(SearchService.class).start();
      injector.getInstance(MonitorService.class).start();
      injector.getInstance(RestController.class).start();
      final ClusterService clusterService = injector.getInstance(ClusterService.class);
      final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
      nodeConnectionsService.start();
      clusterService.setNodeConnectionsService(nodeConnectionsService);
      // TODO hack around circular dependencies problems
      injector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class));
      injector.getInstance(ResourceWatcherService.class).start();
      injector.getInstance(GatewayService.class).start();
      Discovery discovery = injector.getInstance(Discovery.class);
      clusterService.addInitialStateBlock(discovery.getDiscoverySettings().getNoMasterBlock());
      clusterService.setClusterStatePublisher(discovery::publish);
      // start before the cluster service since it adds/removes initial Cluster state blocks
      final TribeService tribeService = injector.getInstance(TribeService.class);
      tribeService.start();
      // Start the transport service now so the publish address will be added to the local disco node in ClusterService
      TransportService transportService = injector.getInstance(TransportService.class);
      transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
      transportService.start();
      validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress());
      DiscoveryNode localNode = DiscoveryNode.createLocal(settings,
          transportService.boundAddress().publishAddress(), injector.getInstance(NodeEnvironment.class).nodeId());
      // TODO: need to find a cleaner way to start/construct a service with some initial parameters,
      // playing nice with the life cycle interfaces
      clusterService.setLocalNode(localNode);
      transportService.setLocalNode(localNode);
      clusterService.add(transportService.getTaskManager());
      clusterService.start();
      // start after cluster service so the local disco is known
      discovery.start();
      transportService.acceptIncomingRequests();
      discovery.startInitialJoin();
      // tribe nodes don't have a master so we shouldn't register an observer         s
      final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
      if (initialStateTimeout.millis() > 0) {
          final ThreadPool thread = injector.getInstance(ThreadPool.class);
          ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, thread.getThreadContext());
          if (observer.observedState().nodes().getMasterNodeId() == null) {
              logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
              final CountDownLatch latch = new CountDownLatch(1);
              observer.waitForNextChange(new ClusterStateObserver.Listener() {
                  @Override
                  public void onNewClusterState(ClusterState state) { latch.countDown(); }
                  @Override
                  public void onClusterServiceClose() {
                      latch.countDown();
                  }
                  @Override
                  public void onTimeout(TimeValue timeout) {
                      logger.warn("timed out while waiting for initial discovery state - timeout: {}",
                          initialStateTimeout);
                      latch.countDown();
                  }
              }, MasterNodeChangePredicate.INSTANCE, initialStateTimeout);
              try {
                  latch.await();
              } catch (InterruptedException e) {
                  throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
              }
          }
      }
      if (NetworkModule.HTTP_ENABLED.get(settings)) {
          injector.getInstance(HttpServer.class).start();
      }
      // start nodes now, after the http server, because it may take some time
      tribeService.startNodes();
      if (WRITE_PORTS_FIELD_SETTING.get(settings)) {
          if (NetworkModule.HTTP_ENABLED.get(settings)) {
              HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
              writePortsFile("http", http.boundAddress());
          }
          TransportService transport = injector.getInstance(TransportService.class);
          writePortsFile("transport", transport.boundAddress());
      }
      logger.info("started");
      return this;
  }
}

HttpServer.java

참고로 먼저 얘기하자면 ES는 HttpServer 구현체로 Netty를 사용한다.

HttpServerdoStart()부분을 보는 이유는 아래와 같이 HttpServer.start()는 부모 클래스의 메서드인 AbstractLifecycleComponent.start()가 호출되기 때문이다. 또한 부모 메서드에서는 doStart()를 호출한다.

doStart()에서는 transport.start()가 주요 로직이다.

HttpServerTransport 인터페이스를 상속받은 구현체는 Netty3HttpServerTransportNetty4HttpServerTransport가 있다.

ES 5.1.1에서는 따로 설정을 바꾼것이 아니면 netty4를 사용하므로 Netty4HttpServerTransport.doStart()를 호출하는 것을 알 수 있다.(Netty4HttpServerTransportAbstractLifecycleComponent를 상속받아 위의 설명과 같은 원리로 doStart()로 넘어온다.)

HttpServerTransport 인터페이스 구현체로 Netty 구현체가 등록되는 과정은 간략하게만 설명하자면 ES 노드 구동 시 plugins 디렉토리와 modules 디렉토리를 뒤져서 plugin을 등록하는데 이 과정에서 transport-netty3와 transport-netty4 디렉토리를 찾는다. 이를 통해 Netty4HttpServerTransportNetty4Transport가 등록된다. 이부분은 따로 설명을 기술할 경우 포스팅 내용이 너무 길어져 생략한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class HttpServer extends AbstractLifecycleComponent implements HttpServerAdapter {
    private final HttpServerTransport transport;
    @Override
    protected void doStart() {
        transport.start();
        if (logger.isInfoEnabled()) {
            logger.info("{}", transport.boundAddress());
        }
    }
}
public abstract class AbstractLifecycleComponent extends AbstractComponent implements LifecycleComponent {
  @SuppressWarnings({"unchecked"})
   @Override
   public void start() {
       if (!lifecycle.canMoveToStarted()) {
           return;
       }
       for (LifecycleListener listener : listeners) {
           listener.beforeStart();
       }
       doStart();
       lifecycle.moveToStarted();
       for (LifecycleListener listener : listeners) {
           listener.afterStart();
       }
   }
 }

Netty4HttpServerTransport.java

Netty4HttpServerTransport.doStart()를 살펴보면 Netty서버를 구동하는 과정을 수행한다.

Netty는 채널에서 핸들러를 등록하는데 이 코드에서는 serverBootstrap.childHandler(configureServerChannelHandler())로 핸들러를 등록한다.

configureServerChannelHandler()를 보면 HttpChannelHandler를 생성한다.

HttpChannelHandler를 보면 requestHandler로 Netty4HttpRequestHandler를 인스턴스화 하여 채널 파이프라인의 끝에 추가한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class Netty4HttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport {
  @Override
  protected void doStart() {
      boolean success = false;
      try {
          this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);
          serverBootstrap = new ServerBootstrap();
          if (blockingServer) {
              serverBootstrap.group(new OioEventLoopGroup(workerCount, daemonThreadFactory(settings,
                  HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
              serverBootstrap.channel(OioServerSocketChannel.class);
          } else {
              serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
                  HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
              serverBootstrap.channel(NioServerSocketChannel.class);
          }
          serverBootstrap.childHandler(configureServerChannelHandler());
          serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
          serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
          final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
          if (tcpSendBufferSize.getBytes() > 0) {
              serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
          }
          final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
          if (tcpReceiveBufferSize.getBytes() > 0) {
              serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
          }
          serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
          serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
          final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
          serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
          serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
          this.boundAddress = createBoundHttpAddress();
          success = true;
      } finally {
          if (success == false) {
              doStop(); // otherwise we leak threads since we never moved to started
          }
      }
  }
  public ChannelHandler configureServerChannelHandler() {
    return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext());
  }
  public ChannelHandler configureServerChannelHandler() {
    return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext());
}
  protected static class HttpChannelHandler extends ChannelInitializer<Channel{
    private final Netty4HttpServerTransport transport;
    private final Netty4HttpRequestHandler requestHandler;
    protected HttpChannelHandler(
        final Netty4HttpServerTransport transport,
        final boolean detailedErrorsEnabled,
        final ThreadContext threadContext) {
        this.transport = transport;
        this.requestHandler = new Netty4HttpRequestHandler(transport, detailedErrorsEnabled, threadContext);
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
        final HttpRequestDecoder decoder = new HttpRequestDecoder(
            Math.toIntExact(transport.maxInitialLineLength.getBytes()),
            Math.toIntExact(transport.maxHeaderSize.getBytes()),
            Math.toIntExact(transport.maxChunkSize.getBytes()));
        decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
        ch.pipeline().addLast("decoder", decoder);
        ch.pipeline().addLast("decoder_compress"new HttpContentDecompressor());
        ch.pipeline().addLast("encoder"new HttpResponseEncoder());
        final HttpObjectAggregator aggregator = new HttpObjectAggregator(Math.toIntExact(transport.maxContentLength.getBytes()));
        if (transport.maxCompositeBufferComponents != -1) {
            aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
        }
        ch.pipeline().addLast("aggregator", aggregator);
        if (transport.compression) {
            ch.pipeline().addLast("encoder_compress"new HttpContentCompressor(transport.compressionLevel));
        }
        if (SETTING_CORS_ENABLED.get(transport.settings())) {
            ch.pipeline().addLast("cors"new Netty4CorsHandler(transport.getCorsConfig()));
        }
        if (transport.pipelining) {
            ch.pipeline().addLast("pipelining"new HttpPipeliningHandler(transport.pipeliningMaxEvents));
        }
        ch.pipeline().addLast("handler", requestHandler);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Netty4Utils.maybeDie(cause);
        super.exceptionCaught(ctx, cause);
    }
  }
}

Netty4HttpRequestHandler.java

Netty4HttpRequestHandler.channelRead0()에서 HttpServer로 넘어온 request를 처리한다고 볼 수 있다.

여기서 주목할 점은 serverTransport.dispatchRequest()로 request를 처리한다는 점이다. 이를 통해 다시 Netty4HttpServerTransport.dispatchRequest()로 넘어간다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object{
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
      final FullHttpRequest request;
      final HttpPipelinedRequest pipelinedRequest;
      if (this.httpPipeliningEnabled && msg instanceof HttpPipelinedRequest) {
          pipelinedRequest = (HttpPipelinedRequest) msg;
          request = (FullHttpRequest) pipelinedRequest.last();
      } else {
          pipelinedRequest = null;
          request = (FullHttpRequest) msg;
      }
      final FullHttpRequest copy =
              new DefaultFullHttpRequest(
                      request.protocolVersion(),
                      request.method(),
                      request.uri(),
                      Unpooled.copiedBuffer(request.content()),
                      request.headers(),
                      request.trailingHeaders());
      final Netty4HttpRequest httpRequest = new Netty4HttpRequest(copy, ctx.channel());
      serverTransport.dispatchRequest(
          httpRequest,
          new Netty4HttpChannel(serverTransport, httpRequest, pipelinedRequest, detailedErrorsEnabled, threadContext));
  }
}

Netty4HttpServerTransport.java

여기서 httpServerAdapter.dispatchRequest()를 호출하여 HttpServer로 다시 넘어간다.

1
2
3
4
5
public class Netty4HttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport {
  protected void dispatchRequest(RestRequest request, RestChannel channel) {
      httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext());
  }
}

HttpServer.java

HttpServer.dispatchRequest()에서 중요한 부분은 restController.dispatchRequest()의 호출이다. 이를 통해 REST로 요청온 request들을 처리한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class HttpServer extends AbstractLifecycleComponent implements HttpServerAdapter {
  public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
      if (request.rawPath().equals("/favicon.ico")) {
          handleFavicon(request, channel);
          return;
      }
      RestChannel responseChannel = channel;
      try {
          int contentLength = request.content().length();
          if (restController.canTripCircuitBreaker(request)) {
              inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
          } else {
              inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
          }
          // iff we could reserve bytes for the request we need to send the response also over this channel
          responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
          restController.dispatchRequest(request, responseChannel, client, threadContext);
      } catch (Exception e) {
          restController.sendErrorResponse(request, responseChannel, e);
      }
  }
}

RestController.java

RestController.dispatchRequest()에서는 executeHandler()를 호출한다.

executeHandler()에서는 Request에 맞는 RestHandler를 읽어와 handler.handleRequest()를 호출한다.

또한, client는 Node.java에서 생성했던 NodeClient를 전달받은 것을 알 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class RestController extends AbstractLifecycleComponent {
  public void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception {
      if (!checkRequestParameters(request, channel)) {
          return;
      }
      try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
          for (String key : headersToCopy) {
              String httpHeader = request.header(key);
              if (httpHeader != null) {
                  threadContext.putHeader(key, httpHeader);
              }
          }
          if (filters.length == 0) {
              executeHandler(request, channel, client);
          } else {
              ControllerFilterChain filterChain = new ControllerFilterChain(handlerFilter);
              filterChain.continueProcessing(request, channel, client);
          }
      }
  }
  void executeHandler(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
    final RestHandler handler = getHandler(request);
    if (handler != null) {
        handler.handleRequest(request, channel, client);
    } else {
        if (request.method() == RestRequest.Method.OPTIONS) {
            // when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added)
            channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
        } else {
            final String msg = "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]";
            channel.sendResponse(new BytesRestResponse(BAD_REQUEST, msg));
        }
    }
  }
}

BaseRestHandler.java

handler.handleRequest()를 호출하면 실제로 BaseRestHandler.handleRequest()가 호출된다.

여기서 중요한 점은 prepareRequest()인데 이 메서드는 BaseRestHandler를 상속한 RestxxxAction이라는 클래스들 중 Request에 맵핑되는 구현체의 prepareRequest()가 호출된다.

(xxx는 *패턴을 의미한다.)

이 핸들러는 위에 언급한 RestController.executeHandler()getHandler() 부분이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class BaseRestHandler extends AbstractComponent implements RestHandler {
  @Override
  public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
      // prepare the request for execution; has the side effect of touching the request parameters
      final RestChannelConsumer action = prepareRequest(request, client);
      // validate unconsumed params, but we must exclude params used to format the response
      // use a sorted set so the unconsumed parameters appear in a reliable sorted order
      final SortedSet<String> unconsumedParams =
          request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
      // validate the non-response params
      if (!unconsumedParams.isEmpty()) {
          final Set<String> candidateParams = new HashSet<>();
          candidateParams.addAll(request.consumedParams());
          candidateParams.addAll(responseParams());
          throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
      }
      // execute the action
      action.accept(channel);
  }
}

RestMainAction.java

"CURL localhost:9200"의 경우 BaseRestHandler를 상속받은 RestMainAction의 prepareRequest()를 통해 처리한다.

위의 Request가 RestMainAction에서 처리된다는 것을 알 수 있는 방법을 아래와 같다.

  • RestController.executeHandler()에서 getHandler()를 리턴받은 handler를 logger로 찍어보면 간단히 알 수 있다.

  • 아래의 코드와 같이 RestxxxAction의 생성자에 각 URL 별로 어떤 Action에서 처리하는지 찾아 볼 수 있다.

  • RestxxxAction 패턴으로 기능별로 코드를 찾아보는 것이 제일 빠르다. (xxx는 *패턴을 의미한다.)

아래 코드에 대한 설명을 하자면 생성자를 통해 RestController에 어떤 Request를 맵핑하여 이 구현체에서 처리할지 등록한다.

이후 client.execute()로 호출한다. 이 client는 NodeClient인것을 알 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RestMainAction extends BaseRestHandler {
  @Inject
  public RestMainAction(Settings settings, RestController controller) {
      super(settings);
      controller.registerHandler(GET, "/"this);
      controller.registerHandler(HEAD, "/"this);
  }
  @Override
  public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
      logger.info("이렇게 찍으면 로그에 나옵니다.");
      return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), new RestBuilderListener<MainResponse>(channel) {
          @Override
          public RestResponse buildResponse(MainResponse mainResponse, XContentBuilder builder) throws Exception {
              return convertMainResponse(mainResponse, request, builder);
          }
      });
  }
  static BytesRestResponse convertMainResponse(MainResponse response, RestRequest request, XContentBuilder builder) throws IOException {
      RestStatus status = response.isAvailable() ? RestStatus.OK : RestStatus.SERVICE_UNAVAILABLE;
      // Default to pretty printing, but allow ?pretty=false to disable
      if (request.hasParam("pretty") == false) {
          builder.prettyPrint().lfAtEnd();
      }
      response.toXContent(builder, request);
      return new BytesRestResponse(status, builder);
  }
}

AbstractClient.java

client.execute()로 아래의 메서드가 호출되고 이는 곧 NodeCleint.doExecute()가 된다.

1
2
3
4
5
6
7
8
public abstract class AbstractClient extends AbstractComponent implements Client {
  @Override
  public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
          Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
      listener = threadedWrapper.wrap(listener);
      doExecute(action, request, listener);
  }
}

NodeClient.java

NodeClient.doExecute()NodeClient.executeLocally()를 호출한다.

NodeClient.executeLocally()를 통해 추상화 클래스 TransportAction를 상속한 TransportxxxAction클래스 중 알맞는 것을 찾아 execute()메서드를 호출한다.

다시 transportAction(action).execute() 호출을 따라가보면 TransportAction을 상속받은 구현체의 action.doExecute()를 호출 하는 것을 알 수 있다.

그러면 TransportAction의 어떤 구현체와 어떻게 맵핑되는지 이해하기 위해 잠시 다른 코드를 보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class NodeClient extends AbstractClient {
  @Override
  public <    Request extends ActionRequest,
              Response extends ActionResponse,
              RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>
          > void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
      // Discard the task because the Client interface doesn't use it.
      logger.info("NodeClient.doExecute()");
      executeLocally(action, request, listener);
  }
  public <    Request extends ActionRequest,
          Response extends ActionResponse
      > Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
    return transportAction(action).execute(request, listener);
  }
}

ActionModule.java

해당 클래스는 ES 소스를 이해하는데 매우 중요한 클래스이다.

갑자기 이 클래스가 등장하여 당황할수 있지만 ActionModule 자체는 Node.java 생성자에서 등록된다. 즉, 설명 범위에서 전혀 벗어나는 클래스는 아니다.

해당 클래스를 통해 RestxxxAction, TransportxxxAction들이 등록된다.

setupRestHandlers()를 통해 각 RestxxxAction 구현체들이 등록되고 각 구현체의 생성자를 통해 request URI가 맵핑된다. 예를 들어 "CURL localhost:9200/"은 RestMainAction에 맵핑된다.

setupActions()를 통해 TransportxxxAction들이 등록되고 예를 들어 MainAction.INSTANCE의 경우 TransportMainAction에 바인딩 된다. MainAction.INSTANCE는 new MainAction() 이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
public class ActionModule extends AbstractModule {
  private final boolean transportClient;
  private final Settings settings;
  private final List<ActionPlugin> actionPlugins;
  private final Map<String, ActionHandler<?, ?>> actions;
  private final List<Class<? extends ActionFilter>> actionFilters;
  private final AutoCreateIndex autoCreateIndex;
  private final DestructiveOperations destructiveOperations;
  private final RestController restController;
  public ActionModule(boolean ingestEnabled, boolean transportClient, Settings settings, IndexNameExpressionResolver resolver,
          ClusterSettings clusterSettings, List<ActionPlugin> actionPlugins) {
      this.transportClient = transportClient;
      this.settings = settings;
      this.actionPlugins = actionPlugins;
      actions = setupActions(actionPlugins);
      actionFilters = setupActionFilters(actionPlugins, ingestEnabled);
      autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, resolver);
      destructiveOperations = new DestructiveOperations(settings, clusterSettings);
      Set<String> headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet());
      restController = new RestController(settings, headers);
  }
  public Map<String, ActionHandler<?, ?>> getActions() {
      return actions;
  }
  static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
      // Subclass NamedRegistry for easy registration
      class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
          public ActionRegistry() {
              super("action");
          }
          public void register(ActionHandler<?, ?> handler) {
              register(handler.getAction().name(), handler);
          }
          public <Request extends ActionRequest, Response extends ActionResponse> void register(
                  GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction,
                  Class<?>... supportTransportActions) {
              register(new ActionHandler<>(action, transportAction, supportTransportActions));
          }
      }
      ActionRegistry actions = new ActionRegistry();
      actions.register(MainAction.INSTANCE, TransportMainAction.class);
      actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
      actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
      actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
      actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
      actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
      actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
      actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
      actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
      actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
      actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
      actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
      actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
      actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
      actions.register(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
      actions.register(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
      actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
      actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
      actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
      actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
      actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
      actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
      actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
      actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
      actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
      actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
      actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
      actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
      actions.register(ShrinkAction.INSTANCE, TransportShrinkAction.class);
      actions.register(RolloverAction.INSTANCE, TransportRolloverAction.class);
      actions.register(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
      actions.register(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
      actions.register(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);
      actions.register(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
      actions.register(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
      actions.register(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
      actions.register(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
      actions.register(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class,
              TransportGetFieldMappingsIndexAction.class);
      actions.register(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
      actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
      actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
      actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
      actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
      actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
      actions.register(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class);
      actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
      actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
      actions.register(FlushAction.INSTANCE, TransportFlushAction.class);
      actions.register(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
      actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
      actions.register(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
      actions.register(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);
      actions.register(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class);
      actions.register(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
      actions.register(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class);
      actions.register(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class);
      actions.register(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class);
      actions.register(IndexAction.INSTANCE, TransportIndexAction.class);
      actions.register(GetAction.INSTANCE, TransportGetAction.class);
      actions.register(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class);
      actions.register(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
              TransportShardMultiTermsVectorAction.class);
      actions.register(DeleteAction.INSTANCE, TransportDeleteAction.class);
      actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class);
      actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
              TransportShardMultiGetAction.class);
      actions.register(BulkAction.INSTANCE, TransportBulkAction.class,
              TransportShardBulkAction.class);
      actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
      actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
      actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
      actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);
      actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
      actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
      //Indexed scripts
      actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
      actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class);
      actions.register(DeleteStoredScriptAction.INSTANCE, TransportDeleteStoredScriptAction.class);
      actions.register(FieldStatsAction.INSTANCE, TransportFieldStatsAction.class);
      actions.register(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
      actions.register(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
      actions.register(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
      actions.register(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);
      actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);
      return unmodifiableMap(actions.getRegistry());
  }
  private List<Class<? extends ActionFilter>> setupActionFilters(List<ActionPlugin> actionPlugins, boolean ingestEnabled) {
      List<Class<? extends ActionFilter>> filters = new ArrayList<>();
      if (transportClient == false) {
          if (ingestEnabled) {
              filters.add(IngestActionFilter.class);
          } else {
              filters.add(IngestProxyActionFilter.class);
          }
      }
      for (ActionPlugin plugin : actionPlugins) {
          filters.addAll(plugin.getActionFilters());
      }
      return unmodifiableList(filters);
  }
  static Set<Class<? extends RestHandler>> setupRestHandlers(List<ActionPlugin> actionPlugins) {
      Set<Class<? extends RestHandler>> handlers = new HashSet<>();
      registerRestHandler(handlers, RestMainAction.class);
      registerRestHandler(handlers, RestNodesInfoAction.class);
      registerRestHandler(handlers, RestNodesStatsAction.class);
      registerRestHandler(handlers, RestNodesHotThreadsAction.class);
      registerRestHandler(handlers, RestClusterAllocationExplainAction.class);
      registerRestHandler(handlers, RestClusterStatsAction.class);
      registerRestHandler(handlers, RestClusterStateAction.class);
      registerRestHandler(handlers, RestClusterHealthAction.class);
      registerRestHandler(handlers, RestClusterUpdateSettingsAction.class);
      registerRestHandler(handlers, RestClusterGetSettingsAction.class);
      registerRestHandler(handlers, RestClusterRerouteAction.class);
      registerRestHandler(handlers, RestClusterSearchShardsAction.class);
      registerRestHandler(handlers, RestPendingClusterTasksAction.class);
      registerRestHandler(handlers, RestPutRepositoryAction.class);
      registerRestHandler(handlers, RestGetRepositoriesAction.class);
      registerRestHandler(handlers, RestDeleteRepositoryAction.class);
      registerRestHandler(handlers, RestVerifyRepositoryAction.class);
      registerRestHandler(handlers, RestGetSnapshotsAction.class);
      registerRestHandler(handlers, RestCreateSnapshotAction.class);
      registerRestHandler(handlers, RestRestoreSnapshotAction.class);
      registerRestHandler(handlers, RestDeleteSnapshotAction.class);
      registerRestHandler(handlers, RestSnapshotsStatusAction.class);
      registerRestHandler(handlers, RestIndicesExistsAction.class);
      registerRestHandler(handlers, RestTypesExistsAction.class);
      registerRestHandler(handlers, RestGetIndicesAction.class);
      registerRestHandler(handlers, RestIndicesStatsAction.class);
      registerRestHandler(handlers, RestIndicesSegmentsAction.class);
      registerRestHandler(handlers, RestIndicesShardStoresAction.class);
      registerRestHandler(handlers, RestGetAliasesAction.class);
      registerRestHandler(handlers, RestAliasesExistAction.class);
      registerRestHandler(handlers, RestIndexDeleteAliasesAction.class);
      registerRestHandler(handlers, RestIndexPutAliasAction.class);
      registerRestHandler(handlers, RestIndicesAliasesAction.class);
      registerRestHandler(handlers, RestCreateIndexAction.class);
      registerRestHandler(handlers, RestShrinkIndexAction.class);
      registerRestHandler(handlers, RestRolloverIndexAction.class);
      registerRestHandler(handlers, RestDeleteIndexAction.class);
      registerRestHandler(handlers, RestCloseIndexAction.class);
      registerRestHandler(handlers, RestOpenIndexAction.class);
      registerRestHandler(handlers, RestUpdateSettingsAction.class);
      registerRestHandler(handlers, RestGetSettingsAction.class);
      registerRestHandler(handlers, RestAnalyzeAction.class);
      registerRestHandler(handlers, RestGetIndexTemplateAction.class);
      registerRestHandler(handlers, RestPutIndexTemplateAction.class);
      registerRestHandler(handlers, RestDeleteIndexTemplateAction.class);
      registerRestHandler(handlers, RestHeadIndexTemplateAction.class);
      registerRestHandler(handlers, RestPutMappingAction.class);
      registerRestHandler(handlers, RestGetMappingAction.class);
      registerRestHandler(handlers, RestGetFieldMappingAction.class);
      registerRestHandler(handlers, RestRefreshAction.class);
      registerRestHandler(handlers, RestFlushAction.class);
      registerRestHandler(handlers, RestSyncedFlushAction.class);
      registerRestHandler(handlers, RestForceMergeAction.class);
      registerRestHandler(handlers, RestUpgradeAction.class);
      registerRestHandler(handlers, RestClearIndicesCacheAction.class);
      registerRestHandler(handlers, RestIndexAction.class);
      registerRestHandler(handlers, RestGetAction.class);
      registerRestHandler(handlers, RestGetSourceAction.class);
      registerRestHandler(handlers, RestHeadAction.Document.class);
      registerRestHandler(handlers, RestHeadAction.Source.class);
      registerRestHandler(handlers, RestMultiGetAction.class);
      registerRestHandler(handlers, RestDeleteAction.class);
      registerRestHandler(handlers, org.elasticsearch.rest.action.document.RestCountAction.class);
      registerRestHandler(handlers, RestSuggestAction.class);
      registerRestHandler(handlers, RestTermVectorsAction.class);
      registerRestHandler(handlers, RestMultiTermVectorsAction.class);
      registerRestHandler(handlers, RestBulkAction.class);
      registerRestHandler(handlers, RestUpdateAction.class);
      registerRestHandler(handlers, RestSearchAction.class);
      registerRestHandler(handlers, RestSearchScrollAction.class);
      registerRestHandler(handlers, RestClearScrollAction.class);
      registerRestHandler(handlers, RestMultiSearchAction.class);
      registerRestHandler(handlers, RestValidateQueryAction.class);
      registerRestHandler(handlers, RestExplainAction.class);
      registerRestHandler(handlers, RestRecoveryAction.class);
      // Scripts API
      registerRestHandler(handlers, RestGetStoredScriptAction.class);
      registerRestHandler(handlers, RestPutStoredScriptAction.class);
      registerRestHandler(handlers, RestDeleteStoredScriptAction.class);
      registerRestHandler(handlers, RestFieldStatsAction.class);
      // Tasks API
      registerRestHandler(handlers, RestListTasksAction.class);
      registerRestHandler(handlers, RestGetTaskAction.class);
      registerRestHandler(handlers, RestCancelTasksAction.class);
      // Ingest API
      registerRestHandler(handlers, RestPutPipelineAction.class);
      registerRestHandler(handlers, RestGetPipelineAction.class);
      registerRestHandler(handlers, RestDeletePipelineAction.class);
      registerRestHandler(handlers, RestSimulatePipelineAction.class);
      // CAT API
      registerRestHandler(handlers, RestCatAction.class);
      registerRestHandler(handlers, RestAllocationAction.class);
      registerRestHandler(handlers, RestShardsAction.class);
      registerRestHandler(handlers, RestMasterAction.class);
      registerRestHandler(handlers, RestNodesAction.class);
      registerRestHandler(handlers, RestTasksAction.class);
      registerRestHandler(handlers, RestIndicesAction.class);
      registerRestHandler(handlers, RestSegmentsAction.class);
      // Fully qualified to prevent interference with rest.action.count.RestCountAction
      registerRestHandler(handlers, org.elasticsearch.rest.action.cat.RestCountAction.class);
      // Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
      registerRestHandler(handlers, org.elasticsearch.rest.action.cat.RestRecoveryAction.class);
      registerRestHandler(handlers, RestHealthAction.class);
      registerRestHandler(handlers, org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction.class);
      registerRestHandler(handlers, RestAliasAction.class);
      registerRestHandler(handlers, RestThreadPoolAction.class);
      registerRestHandler(handlers, RestPluginsAction.class);
      registerRestHandler(handlers, RestFielddataAction.class);
      registerRestHandler(handlers, RestNodeAttrsAction.class);
      registerRestHandler(handlers, RestRepositoriesAction.class);
      registerRestHandler(handlers, RestSnapshotAction.class);
      registerRestHandler(handlers, RestTemplatesAction.class);
      for (ActionPlugin plugin : actionPlugins) {
          for (Class<? extends RestHandler> handler : plugin.getRestHandlers()) {
              registerRestHandler(handlers, handler);
          }
      }
      return handlers;
  }
  private static void registerRestHandler(Set<Class<? extends RestHandler>> handlers, Class<? extends RestHandler> handler) {
      if (handlers.contains(handler)) {
          throw new IllegalArgumentException("can't register the same [rest_handler] more than once for [" + handler.getName() + "]");
      }
      handlers.add(handler);
  }
  @Override
  protected void configure() {
      Multibinder<ActionFilter> actionFilterMultibinder = Multibinder.newSetBinder(binder(), ActionFilter.class);
      for (Class<? extends ActionFilter> actionFilter : actionFilters) {
          actionFilterMultibinder.addBinding().to(actionFilter);
      }
      bind(ActionFilters.class).asEagerSingleton();
      bind(DestructiveOperations.class).toInstance(destructiveOperations);
      if (false == transportClient) {
          // Supporting classes only used when not a transport client
          bind(AutoCreateIndex.class).toInstance(autoCreateIndex);
          bind(TransportLivenessAction.class).asEagerSingleton();
          // register GenericAction -> transportAction Map used by NodeClient
          @SuppressWarnings("rawtypes")
          MapBinder<GenericAction, TransportAction> transportActionsBinder
                  = MapBinder.newMapBinder(binder(), GenericAction.class, TransportAction.class);
          for (ActionHandler<?, ?> action : actions.values()) {
              // bind the action as eager singleton, so the map binder one will reuse it
              bind(action.getTransportAction()).asEagerSingleton();
              transportActionsBinder.addBinding(action.getAction()).to(action.getTransportAction()).asEagerSingleton();
              for (Class<?> supportAction : action.getSupportTransportActions()) {
                  bind(supportAction).asEagerSingleton();
              }
          }
          // Bind the RestController which is required (by Node) even if rest isn't enabled.
          bind(RestController.class).toInstance(restController);
          // Setup the RestHandlers
          if (NetworkModule.HTTP_ENABLED.get(settings)) {
              Multibinder<RestHandler> restHandlers = Multibinder.newSetBinder(binder(), RestHandler.class);
              Multibinder<AbstractCatAction> catHandlers = Multibinder.newSetBinder(binder(), AbstractCatAction.class);
              for (Class<? extends RestHandler> handler : setupRestHandlers(actionPlugins)) {
                  bind(handler).asEagerSingleton();
                  if (AbstractCatAction.class.isAssignableFrom(handler)) {
                      catHandlers.addBinding().to(handler.asSubclass(AbstractCatAction.class));
                  } else {
                      restHandlers.addBinding().to(handler);
                  }
              }
          }
      }
  }
  public RestController getRestController() {
      return restController;
  }
}

RestMainAction.java

위의 원리를 이해 했으면 RestMainAction.prepareRequest()를 다시보자.

MainAction.INSTANCE으로 client.execute()를 호출하는데 결국 이 부분은 TransportMainAction.doExecute()를 호출하는 것을 알 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RestMainAction extends BaseRestHandler {
  @Inject
  public RestMainAction(Settings settings, RestController controller) {
      super(settings);
      controller.registerHandler(GET, "/"this);
      controller.registerHandler(HEAD, "/"this);
  }
  @Override
  public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
      logger.info("이렇게 찍으면 로그에 나옵니다.");
      return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), new RestBuilderListener<MainResponse>(channel) {
          @Override
          public RestResponse buildResponse(MainResponse mainResponse, XContentBuilder builder) throws Exception {
              return convertMainResponse(mainResponse, request, builder);
          }
      });
  }
}

TransportMainAction.java

드디어 끝이다. TransportMainAction.doExecute()에서 MainResponse를 통해 response를 생성하여 리턴하는 것을 알 수 있다.

주요 정보는 노드이름, 버전, 클러스터 이름, 클러스터 UUID, 빌드정보이다.

이 코드가 진짜 "CURL localhost:9200/"에 대한 리턴을 담당하는 코드가 맞는지 확인하기 위해 기존 코드에 아래와 같이 UUID 앞뒤에 대괄호를 삽입하였다. 이 코드로 ES를 구동하여 CURL을 호출하여 확인해보자.

1
2
3
4
5
6
7
8
9
10
11
public class TransportMainAction extends HandledTransportAction<MainRequestMainResponse{
  @Override
  protected void doExecute(MainRequest request, ActionListener<MainResponse> listener) {
      ClusterState clusterState = clusterService.state();
      assert Node.NODE_NAME_SETTING.exists(settings);
      final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false;
      listener.onResponse(
          new MainResponse(Node.NODE_NAME_SETTING.get(settings), Version.CURRENT, "[" + clusterState.metaData().clusterUUID() + "]",
                  clusterState.metaData().clusterUUID(), Build.CURRENT, available));
  }
}

검증하기

아래와 같이 커맨드를 호출하였을 때 UUID 앞뒤에 "[", "]" 괄호가 생긴것을 알 수 있다.

$ CURL localhost:9200

결론

  • ES 구동과정의 핵심은 Node.java 생성자와 start() 메서드이다.

  • ES 소스를 보는 간단한 방법은 org.elasticsearch.rest.action 패키지의 RestxxxAction 소스를 기능별로 하나씩 따라가보는 것이다. (xxx는 *패턴을 뜻함)

  • 이 포스팅 예제를 예로들면 RestMainAction 부터 보면 된다.

  • 기능별로 따라가보는 방법을 더 예를 들자면 아래와 같다.

    • 색인 : RestIndexAction

    • 조회 : RestGetAction

    • 검색 : RestSearchAction


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.