🤬
  • Add Python server implementation with build binary target.

    PiperOrigin-RevId: 464913450
    Change-Id: Ic217cc3a53eb9e9fae113022bd7d639f81f0e2fa
  • Loading...
  • John Y. Kim committed with Copybara-Service 2 years ago
    6d864272
    1 parent 54f6956b
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■ ■
    plugin_server/py/plugin_server.py
     1 +# Copyright 2022 Google LLC
     2 +#
     3 +# Licensed under the Apache License, Version 2.0 (the "License");
     4 +# you may not use this file except in compliance with the License.
     5 +# You may obtain a copy of the License at
     6 +#
     7 +# http://www.apache.org/licenses/LICENSE-2.0
     8 +#
     9 +# Unless required by applicable law or agreed to in writing, software
     10 +# distributed under the License is distributed on an "AS IS" BASIS,
     11 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 +# See the License for the specific language governing permissions and
     13 +# limitations under the License.
     14 +"""Main gRPC server to execute Python Tsunami plugins."""
     15 +from concurrent import futures
     16 +import importlib
     17 +import pkgutil
     18 +import signal
     19 +import threading
     20 +import types
     21 + 
     22 +from absl import app
     23 +from absl import flags
     24 +from absl import logging
     25 +import grpc
     26 +from grpc_health.v1 import health
     27 +from grpc_health.v1 import health_pb2
     28 +from grpc_health.v1 import health_pb2_grpc
     29 +from grpc_reflection.v1alpha import reflection
     30 + 
     31 +from tsunami.plugin_server.py import plugin_service
     32 +from tsunami.plugin_server.py import tsunami_plugin
     33 +from tsunami.proto import plugin_service_pb2
     34 +from tsunami.proto import plugin_service_pb2_grpc
     35 + 
     36 +_HOST = 'localhost'
     37 + 
     38 +_PORT = flags.DEFINE_integer('port', 34567, 'port to listen on')
     39 +_THREADS = flags.DEFINE_integer('threads', 10,
     40 + 'number of worker threads in thread pool')
     41 +_OUTPUT = flags.DEFINE_string('log_output', '/tmp',
     42 + 'server execution log directory')
     43 + 
     44 + 
     45 +def main(unused_argv):
     46 + logging.use_absl_handler()
     47 + logging.get_absl_handler().use_absl_log_file('py_plugin_server',
     48 + _OUTPUT.value)
     49 + logging.set_verbosity(logging.INFO)
     50 + # Load plugins from tsunami_plugins repository.
     51 + plugin_pkg = importlib.import_module(
     52 + 'py_plugins')
     53 + _import_py_plugins(plugin_pkg)
     54 + 
     55 + server_addr = f'{_HOST}:{_PORT.value}'
     56 + server = grpc.server(futures.ThreadPoolExecutor(max_workers=_THREADS.value))
     57 + 
     58 + _configure_plugin_service(server)
     59 + _configure_health_service(server)
     60 + server.add_secure_port(server_addr, grpc.local_server_credentials())
     61 + 
     62 + server.start()
     63 + logging.info('Server started at %s.', server_addr)
     64 + 
     65 + # Java Process.destroy() sends SIGTERM
     66 + sig_term_received = threading.Event()
     67 + 
     68 + def on_sigterm(signum, frame):
     69 + logging.info('Got signal %s, %s', signum, frame)
     70 + sig_term_received.set()
     71 + 
     72 + signal.signal(signal.SIGTERM, on_sigterm)
     73 + sig_term_received.wait()
     74 + logging.info('Stopped RPC server, Waiting for RPCs to complete...')
     75 + server.stop(3).wait()
     76 + logging.info('Done stopping server')
     77 + 
     78 + 
     79 +def _import_py_plugins(plugin_pkg: types.ModuleType):
     80 + """Imports all Python Tsunami plugin modules."""
     81 + for _, name, is_pkg in pkgutil.walk_packages(plugin_pkg.__path__):
     82 + full_name = plugin_pkg.__name__ + '.' + name
     83 + pkg = importlib.import_module(full_name)
     84 + if is_pkg:
     85 + _import_py_plugins(pkg)
     86 + else:
     87 + logging.info('Loaded plugin module %s', full_name)
     88 + 
     89 + 
     90 +def _configure_plugin_service(server):
     91 + """Configures the main plugin service for handling plugin related gRPC requests."""
     92 + # Get all VulnDetector class implementations.
     93 + plugins = [cls() for cls in tsunami_plugin.VulnDetector.__subclasses__()]
     94 + servicer = plugin_service.PluginServiceServicer(
     95 + py_plugins=plugins, max_workers=_THREADS.value)
     96 + plugin_service_pb2_grpc.add_PluginServiceServicer_to_server(servicer, server)
     97 + 
     98 + 
     99 +def _configure_health_service(server):
     100 + """Configures gRPC health checking service for server health monitoring."""
     101 + health_servicer = health.HealthServicer(
     102 + experimental_non_blocking=True,
     103 + experimental_thread_pool=futures.ThreadPoolExecutor(
     104 + max_workers=_THREADS.value))
     105 + health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
     106 + 
     107 + # Set all services to SERVING.
     108 + services = tuple(service.full_name
     109 + for service in plugin_service_pb2.DESCRIPTOR.services_by_name
     110 + .values()) + (reflection.SERVICE_NAME, health.SERVICE_NAME)
     111 + for service in services:
     112 + logging.info('Registering service %s', service)
     113 + health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
     114 + reflection.enable_server_reflection(services, server)
     115 + 
     116 + 
     117 +if __name__ == '__main__':
     118 + app.run(main)
     119 + 
Please wait...
Page is in error, reload to recover