[ Apache Kafka](http://kafka.apache.org/)は、大量のリアルタイムデータを効果的に処理するように設計された人気の分散メッセージブローカーです。 Kafkaクラスターは、拡張性と障害耐性が高いだけでなく、ActiveMQやRabbitMQなどの他のメッセージブローカーと比較してスループットが高くなっています。通常、公開/サブスクライブメッセージングシステムとして使用されますが、多くの公開されたメッセージの永続的なストレージを提供するため、組織はログの集約にも使用します。
公開/サブスクライブメッセージングシステムを使用すると、1人以上のプロデューサーが、消費者の数やメッセージの処理方法に関係なく、メッセージを公開できます。サブスクライブされたクライアントには、更新と新しいメッセージの作成について自動的に通知されます。このシステムは、クライアントが定期的にポーリングして新しいメッセージが利用可能かどうかを判断するシステムよりも効率的でスケーラブルです。
このチュートリアルでは、CentOS7にApacheKafka1.1.0をインストールして使用します。
続行するには、次のものが必要です。
Kafkaはネットワーク経由でリクエストを処理できるため、専用のユーザーを作成する必要があります。 Kafkaサーバーが危険にさらされた場合、これによりCentOSマシンへの損傷を最小限に抑えることができます。このステップでは専用の** kafka **ユーザーを作成しますが、Kafkaのセットアップが完了したら、このサーバーで他のタスクを実行するために別の非rootユーザーを作成する必要があります。
sudo権限を持つ非rootユーザーとしてログインし、 useradd
コマンドを使用して** kafka **という名前のユーザーを作成します。
sudo useradd kafka -m
-m
フラグは、ユーザーのホームディレクトリが作成されることを保証します。このホームディレクトリ / home / kafka
は、次のセクションのコマンドを実行するためのワークスペースディレクトリとして機能します。
passwd
を使用してパスワードを設定します。
sudo passwd kafka
adduser
コマンドを使用して** kafka **ユーザーを wheel
グループに追加し、Kafkaの依存関係をインストールするために必要な権限を付与します。
sudo usermod -aG wheel kafka
これで、** kafka **ユーザーの準備が整いました。 su
メソッドを使用して、このアカウントにログインします。
su -l kafka
Kafka固有のユーザーを作成したので、Kafkaバイナリのダウンロードと解凍に進むことができます。
** kafka バイナリファイルをダウンロードして、 kafka **ユーザーホームディレクトリの専用フォルダに解凍しましょう。
まず、 / home / kafka
にディレクトリ Downloads
を作成して、ダウンロードを保存します。
mkdir ~/Downloads
curl
を使用してKafkaバイナリファイルをダウンロードします。
curl "http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz"-o ~/Downloads/kafka.tgz
kafka
というディレクトリを作成し、このディレクトリに移動します。これは、Kafkaインストールのベースディレクトリになります。
mkdir ~/kafka && cd ~/kafka
tar
コマンドを使用して、ダウンロードしたアーカイブを抽出します。
tar -xvzf ~/Downloads/kafka.tgz --strip 1
--strip 1
フラグを指定して、アーカイブ〜/ kafka /
自体のコンテンツが抽出され、その中の別のディレクトリ(たとえば、〜/ kafka / kafka_2.12-1.1.0 /
)にないようにします。エキス。
バイナリファイルのダウンロードと解凍が正常に完了したので、トピックを削除できるようにKafkaを構成し続けることができます。
Kafkaのデフォルトの動作では、メッセージを公開できるトピック、カテゴリ、グループ、またはフィード名を削除できません。それを変更するには、構成ファイルを編集しましょう。
Kafkaの構成オプションは、 server.properties
で指定されています。 vi
またはお気に入りのエディターでこのファイルを開きます。
vi ~/kafka/config/server.properties
Kafkaトピックを削除できる設定を追加しましょう。 i
を押してテキストを挿入し、ファイルの最後に以下を追加します。
delete.topic.enable =true
終了したら、 ESC
を押して挿入モードを終了し、:wq
を押してファイルに変更を書き込んで終了します。 Kafkaを構成したので、実行するsystemdユニットファイルの作成に進み、起動時に有効にすることができます。
このセクションでは、Kafkaサービスのsystemdユニットファイルを作成します。これにより、他のLinuxサービスと同じ方法で、Kafkaの起動、停止、再起動などの一般的なサービス操作を実行できます。
Zookeeperは、Kafkaがクラスターのステータスと構成を管理するために使用するサービスです。これは通常、多くの分散システムで不可欠なコンポーネントとして使用されます。詳細については、公式の[Zookeeperドキュメント](https://zookeeper.apache.org/doc/current/index.html)をご覧ください。
zookeeper
のユニットファイルを作成します。
sudo vi /etc/systemd/system/zookeeper.service
ファイルに次のユニット定義を入力します。
[ Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[ Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[ Install]
WantedBy=multi-user.target
[ Unit]
の部分は、Zookeeperにネットワークが必要であり、開始する前にファイルシステムの準備ができていることを指定します。
[Service]
セクションは、systemdがサービスを開始および停止するために zookeeper-server-start.sh
および zookeeper-server-stop.sh
シェルファイルを使用する必要があることを指定します。また、Zookeeperが異常終了した場合に自動的に再起動するように指定しています。
次に、 kafka
のsystemdサービスファイルを作成します。
sudo vi /etc/systemd/system/kafka.service
ファイルに次のユニット定義を入力します。
[ Unit]
Requires=zookeeper.service
After=zookeeper.service
[ Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[ Install]
WantedBy=multi-user.target
[Unit]
セクションは、このユニットファイルが zookeeper.service
に依存することを指定します。これにより、 kafa
サービスの開始時に zookeeper
が自動的に開始されます。
[Service]
セクションは、systemdがサービスを開始および停止するために kafka-server-start.sh
および kafka-server-stop.sh
シェルファイルを使用する必要があることを指定します。また、Kafkaが異常終了した場合に自動的に再起動するように指定しています。
ユニットが定義されたので、次のコマンドを使用してKafkaを起動します。
sudo systemctl start kafka
サーバーが正常に起動したことを確認するには、 kafka
ユニットのログを確認してください。
journalctl -u kafka
次のような出力が表示されます。
Jul 1718:38:59 kafka-centos systemd[1]: Started kafka.service.
これで、ポート 9092
でリッスンしているKafkaサーバーができました。
kafka
サービスを開始しましたが、サーバーを再起動しても自動的には開始されません。サーバーの起動時に kafka
を有効にするには、次のコマンドを実行します。
sudo systemctl enable kafka
サービスを開始して有効にしたので、インストールを確認しましょう。
** "Hello World" **メッセージを公開して使用し、Kafkaサーバーが正しく実行されていることを確認しましょう。 Kafkaでメッセージを公開するには、次のものが必要です。
まず、次の名前を入力してトピック TutorialTopic
を作成します。
~ /kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic TutorialTopic
kafka-console-producer.sh
スクリプトを使用して、コマンドラインからジェネレーターを作成できます。これは、Kafkaサーバーのホスト名、ポート、およびトピック名をパラメーターとして想定しています。
次のように入力して、文字列 " Hello、World "
を TutorialTopic
トピックに公開します。
echo "Hello, World"|~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092--topic TutorialTopic >/dev/null
次に、 kafka-console-consumer.sh
スクリプトを使用して、Kafkaコンシューマーを作成できます。 ZooKeeperサーバーのホスト名とポート、およびテーマ名をパラメーターとして想定しています。
次のコマンドは、 TutorialTopic
からのメッセージを使用します。 --from-beginning
フラグの使用に注意してください。これにより、消費者が開始する前に公開されたメッセージを消費できます。
~ /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092--topic TutorialTopic --from-beginning
構成に問題がない場合は、ターミナルに「Hello、World」と表示されます。
Hello, World
スクリプトは引き続き実行され、トピックにさらにメッセージが投稿されるのを待ちます。新しいターミナルを開いて、プロデューサーを開始してさらにメッセージを投稿してください。あなたはcomsumerの出力でそれらを見ることができるはずです。
テストが完了したら、 CTRL + C
を押してユーザースクリプトを停止します。インストールをテストしたので、KafkaTのインストールに進みましょう。
KafkaTは、Airbnbのツールであり、Kafkaクラスターに関する詳細情報をより簡単に表示し、コマンドラインから特定の管理タスクを実行できます。 Rubyの逸品なので、使うにはRubyが必要です。また、依存する他のgemをビルドするには、 ruby-devel
とビルド関連のパッケージ( make
や gcc
など)が必要です。 yum
を使用してそれらをインストールします。
sudo yum install ruby ruby-devel make gcc patch
これで、gemコマンドを使用してKafkaTをインストールできます。
sudo gem install kafkat
KafkaTは、構成ファイルとして .kafkatcfg
を使用して、Kafkaサーバーのインストールおよびログディレクトリを決定します。また、KafkaTをZooKeeperインスタンスにポイントするエントリが必要です。
.kafkatcfg
という名前の新しいファイルを作成します。
vi ~/.kafkatcfg
次の行を追加して、KafkaサーバーとZookeeperインスタンスに関する必要な情報を指定します。
{" kafka_path":"~/kafka","log_path":"/tmp/kafka-logs","zk_path":"localhost:2181"}
これで、KafkaTを使用できます。まず、これを使用して、すべてのKafkaパーティションに関する詳細情報を表示できます。
kafkat partitions
次の出力が表示されます。
Topic Partition Leader Replicas ISRs
TutorialTopic 00[0][0]
__ consumer_offsets 00[0][0]......
クライアント関連の情報を保存するために使用されるKafkaの内部トピックである TutorialTopic
と __consumer_offsets
が表示されます。 __consumer_offsets
で始まる行は無視してかまいません。
より多くのCentOS7コンピューターを使用してマルチエージェントクラスターを作成する場合は、新しいコンピューターごとに手順1、手順4、および手順5を繰り返す必要があります。さらに、 server.properties
ファイルに次の変更を加える必要があります。
broker.id
プロパティの値は、クラスター全体で一意になるように変更する必要があります。この属性は、クラスター内の各サーバーを一意に識別し、任意の文字列をその値として使用できます。たとえば、 " server1 "
、 " server2 "
などです。zookeeper.connect
プロパティの値は、すべてのノードが同じZooKeeperインスタンスを指すように変更する必要があります。この属性は、Zookeeperインスタンスのアドレスを指定し、 の後に続きます<HOSTNAME/IP_ADDRESS>:<PORT>
フォーマット。たとえば、 " 203.0.113.0:2181 "
、 " 203.0.113.1:2181 "
などです。クラスタに複数のZooKeeperインスタンスを設定する場合、各ノードの zookeeper.connect
属性値は、すべてのZooKeeperインスタンスのIPアドレスとポート番号をリストする同じコンマ区切りの文字列である必要があります。
すべてのインストールが完了したので、** kafka **ユーザーの管理者権限を削除できます。これを行う前に、他の非rootsudoユーザーとしてログアウトして再度ログインしてください。同じシェルセッションをまだ実行している場合は、「exit」と入力してこのチュートリアルを開始します。
** kafka **ユーザーをsudoグループから削除します。
sudo gpasswd -d kafka wheel
Kafkaサーバーのセキュリティをさらに向上させるには、このコマンドを使用して、** kafka **ユーザーのパスワード passwd
をロックします。これにより、誰もこのアカウントでサーバーに直接ログインできないようになります。
sudo passwd kafka -l
現時点では、rootまたはsudoユーザーのみが kafka
コマンドを入力してログインできます。
sudo su - kafka
将来、ロックを解除する場合は、 passwd
の -u
オプションを使用してください。
sudo passwd kafka -u
これで、** kafka **ユーザーの管理者権限が正常に制限されました。
これで、CentOSサーバーでApacheKafkaを安全に実行できます。 Kafkaクライアント(ほとんどのプログラミング言語で使用可能)を使用して、プロジェクトで使用するKafkaプロデューサーとコンシューマーを作成できます。
Apache Kafka関連のチュートリアルのインストールの詳細については、[Tencent Cloud + Community](https://cloud.tencent.com/developer?from=10680)にアクセスして詳細を確認してください。
参照:「CentOS7にApacheKafkaをインストールする方法」
Recommended Posts