Puppet Class: confluent::kafka::broker
- Inherits:
 - confluent::params
 
- Defined in:
 - manifests/kafka/broker.pp
 
Overview
Class is used to install and configure an Apache Kafka Broker using the Confluent installation packages.
        50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173  | 
      
        # File 'manifests/kafka/broker.pp', line 50
class confluent::kafka::broker (
  Integer $broker_id,
  Hash[String, Variant[String, Integer, Boolean]] $config               = {},
  Hash[String, Variant[String, Integer, Boolean]] $logging_config       = $::confluent::params::kafka_logging_config,
  Hash[String, Variant[String, Integer, Boolean]] $environment_settings = {},
  Stdlib::Unixpath $config_path                                         = $::confluent::params::kafka_config_path,
  Stdlib::Unixpath $logging_config_path                                 =
  $::confluent::params::kafka_logging_config_path,
  Stdlib::Unixpath $environment_file                                    = $::confluent::params::kafka_environment_path,
  Variant[Stdlib::Unixpath, Array[Stdlib::Unixpath]] $data_path         = $::confluent::params::kafka_data_path,
  Stdlib::Unixpath $log_path                                            = $::confluent::params::kafka_log_path,
  String $user                                                          = $::confluent::params::kafka_user,
  String $service_name                                                  = $::confluent::params::kafka_service,
  Boolean $manage_service                                               = $::confluent::params::kafka_manage_service,
  Enum['running', 'stopped'] $service_ensure                            = $::confluent::params::kafka_service_ensure,
  Boolean $service_enable                                               = $::confluent::params::kafka_service_enable,
  Integer $file_limit                                                   = $::confluent::params::kafka_file_limit,
  Integer $stop_timeout_secs                                            = $::confluent::params::kafka_stop_timeout_secs,
  Boolean $manage_repository                                            = $::confluent::params::manage_repository,
  String $heap_size                                                     = $::confluent::params::kafka_heap_size,
  Boolean $restart_on_logging_change                                    = true,
  Boolean $restart_on_change                                            = true,
  Variant[String, Array[String]] $zookeeper_connect                     = 'localhost:2181'
) inherits confluent::params {
  include ::confluent
  include ::confluent::kafka
  if($manage_repository) {
    include ::confluent::repository
  }
  $default_config = {
    'broker.id'                                => $broker_id,
    'log.dirs'                                 => join(any2array($data_path), ','),
    'confluent.support.customer.id'            => 'anonymous',
    'confluent.support.metrics.enable'         => true,
    'group.initial.rebalance.delay.ms'         => 0,
    'log.retention.check.interval.ms'          => 300000,
    'log.retention.hours'                      => 168,
    'log.segment.bytes'                        => 1073741824,
    'num.io.threads'                           => 8,
    'num.network.threads'                      => 3,
    'num.partitions'                           => 1,
    'num.recovery.threads.per.data.dir'        => 1,
    'offsets.topic.replication.factor'         => 3,
    'socket.receive.buffer.bytes'              => 102400,
    'socket.request.max.bytes'                 => 104857600,
    'socket.send.buffer.bytes'                 => 102400,
    'transaction.state.log.min.isr'            => 2,
    'transaction.state.log.replication.factor' => 3,
    'zookeeper.connect'                        => join(any2array($zookeeper_connect), ','),
    'zookeeper.connection.timeout.ms'          => 6000,
  }
  $actual_config = merge($default_config, $config)
  confluent::properties { $service_name:
    ensure => present,
    path   => $config_path,
    config => $actual_config
  }
  $default_environment_settings = {
    'KAFKA_HEAP_OPTS'  => "-Xmx${heap_size}",
    'KAFKA_OPTS'       => '-Djava.net.preferIPv4Stack=true',
    'GC_LOG_ENABLED'   => true,
    'LOG_DIR'          => $log_path,
    'KAFKA_LOG4J_OPTS' => "-Dlog4j.configuration=file:${logging_config_path}"
  }
  $actual_environment_settings = merge($default_environment_settings, $environment_settings)
  confluent::environment { $service_name:
    ensure => present,
    path   => $environment_file,
    config => $actual_environment_settings
  }
  confluent::logging { $service_name:
    path   => $logging_config_path,
    config => $logging_config
  }
  user { $user:
    ensure => present
  } ->
  file { [$log_path, $data_path]:
    ensure  => directory,
    owner   => $user,
    group   => $user,
    recurse => true,
    recurselimit => 1,
    tag     => '__confluent__'
  }
  confluent::systemd::unit { $service_name:
    config => {
      'Unit'    => {
        'Description' => 'Apache Kafka by Confluent'
      },
      'Service' => {
        'User'            => $user,
        'EnvironmentFile' => $environment_file,
        'ExecStart'       => "/usr/bin/kafka-server-start ${config_path}",
        'ExecStop'        => '/usr/bin/kafka-server-stop',
        'LimitNOFILE'     => $file_limit,
      }
    }
  }
  if($manage_service) {
    service { $service_name:
      ensure => $service_ensure,
      enable => $service_enable,
      tag    => '__confluent__'
    }
    if($restart_on_change) {
      Confluent::Systemd::Unit[$service_name] ~> Service[$service_name]
      Confluent::Environment[$service_name] ~> Service[$service_name]
      Confluent::Properties[$service_name] ~> Service[$service_name]
      if($restart_on_logging_change) {
        Confluent::Logging[$service_name] ~> Service[$service_name]
      }
    }
  }
}
       |