[ Apache Kafka](http://kafka.apache.org/)は、大量のリアルタイムデータを効果的に処理するように設計された人気の分散メッセージブローカーです。 Kafkaクラスターは、拡張性と耐障害性が高いだけでなく、[ActiveMQ](http://activemq.apache.org/)や[RabbitMQ)](https://www.rabbitmq.com/)などの他のメッセージブローカーよりもスループットが高くなっています。これは公開/サブスクライブメッセージングシステムとして一般的に使用されますが、公開されたメッセージの永続的なストレージを提供するため、多くの組織がログの集約にも使用します。
公開/サブスクライブメッセージングシステムを使用すると、消費者の数やメッセージの処理方法に関係なく、1人以上のプロデューサーがメッセージを公開できます。サブスクライブされたクライアントには、更新と新しいメッセージの作成について自動的に通知されます。このシステムは、クライアントが定期的にポーリングして新しいメッセージが利用可能かどうかを判断するシステムよりも効率的でスケーラブルです。
このチュートリアルでは、Ubuntu18.04にApacheKafka1.1.0をインストールして使用します。
続行するには、次のものが必要です。
Kafkaはネットワーク経由でリクエストを処理できるため、専用のユーザーを作成する必要があります。 Kafkaサーバーが危険にさらされた場合、これによりUbuntuコンピューターへの損傷を最小限に抑えることができます。このステップでは専用の** kafka **ユーザーを作成しますが、Kafkaのセットアップが完了したら、このサーバーで他のタスクを実行するために別の非rootユーザーを作成する必要があります。
root以外のsudoユーザーとしてログインし、次の useradd
コマンドを使用して、** kafka **という名前のユーザーを作成します。
sudo useradd kafka -m
-m
フラグは、ユーザーのホームディレクトリが作成されることを保証します。この / home / kafka
ホームディレクトリは、次のセクションのコマンドを実行するためのワークスペースディレクトリとして機能します。
passwd
を使用して、次のパスワードを設定します。
sudo passwd kafka
このコマンドを使用して、** kafka **ユーザーを sudo
グループ adduser
に追加し、Kafka依存関係をインストールするために必要な権限を付与します。
sudo adduser kafka sudo
これで、** kafka **ユーザーの準備が整いました。このアカウントにログインするには、次の su
メソッドを使用します。
su -l kafka
Kafka固有のユーザーを作成したので、Kafkaバイナリのダウンロードと解凍に進むことができます。
** kafka バイナリファイルをダウンロードして、 kafka **ユーザーホームディレクトリの専用フォルダに解凍しましょう。
まず、 / home / kafka
に Downloads
という名前のディレクトリを作成して、ダウンロードを保存します。
mkdir ~/Downloads
curl
を使用して、Kafkaのバイナリファイルをダウンロードします。
curl "http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz"-o ~/Downloads/kafka.tgz
kafka
というディレクトリを作成し、このディレクトリに移動します。これは、Kafkaインストールのベースディレクトリになります。
mkdir ~/kafka && cd ~/kafka
次の tar
コマンドを使用して、ダウンロードしたアーカイブを抽出します。
tar -xvzf ~/Downloads/kafka.tgz --strip 1
--strip 1
フラグを指定して、アーカイブ〜/ kafka /
自体のコンテンツが抽出され、その中の別のディレクトリ(たとえば、〜/ kafka / kafka_2.12-1.1.0 /
)にないようにします。エキス。
バイナリファイルのダウンロードと解凍が正常に完了したので、トピックを削除できるようにKafkaを構成し続けることができます。
Kafkaのデフォルトの動作では、メッセージを公開できるトピック、カテゴリ、グループ、またはフィード名を削除できません。それを変更するには、構成ファイルを編集しましょう。
Kafkaの構成オプションは、 server.properties
で指定されています。 nano
または選択した別のエディターでこのファイルを開きます。
nano ~/kafka/config/server.properties
Kafkaトピックを削除できる設定を追加しましょう。ファイルの最後に以下を追加します。
delete.topic.enable =true
ファイルを保存して nano
を終了します。 Kafkaを構成したので、実行するsystemdユニットファイルの作成に進み、起動時に有効にすることができます。
このセクションでは、Kafkaサービス用の[systemdユニットファイル](https://www.digitalocean.com/community/tutorials/understanding-systemd-units-and-unit-files)を作成します。これにより、他のLinuxサービスと同じ方法で、Kafkaの起動、停止、再起動などの一般的なサービス操作を実行できます。
Zookeeperは、Kafkaがクラスターのステータスと構成を管理するために使用するサービスです。これは通常、多くの分散システムで不可欠なコンポーネントとして使用されます。詳細については、公式の[Zookeeperドキュメント](https://zookeeper.apache.org/doc/current/index.html)をご覧ください。
zookeeper
のユニットファイルを作成します。
sudo nano /etc/systemd/system/zookeeper.service
ファイルに次のユニット定義を入力します。
[ Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[ Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[ Install]
WantedBy=multi-user.target
[Unit]
の部分は、Zookeeperにネットワークが必要であり、ファイルシステムが起動する前に準備ができていることを指定します。
[Service]
セクションは、systemdがサービスを開始および停止するために zookeeper-server-start.sh
および zookeeper-server-stop.sh
シェルファイルを使用する必要があることを指定します。また、Zookeeperが異常終了した場合に自動的に再起動するように指定しています。
次に、次の kafka
コンテンツのsystemdサービスファイルを作成します。
sudo nano /etc/systemd/system/kafka.service
ファイルに次のユニット定義を入力します。
[ Unit]
Requires=zookeeper.service
After=zookeeper.service
[ Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[ Install]
WantedBy=multi-user.target
[Unit]
セクションは、このユニットファイルが zookeeper.service
に依存することを指定します。これにより、 kafa
サービスの開始時に zookeeper
が自動的に開始されます。
[Service]
セクションは、systemdがサービスを開始および停止するために kafka-server-start.sh
および kafka-server-stop.sh
シェルファイルを使用する必要があることを指定します。また、Kafkaが異常終了した場合に自動的に再起動するように指定しています。
ユニットが定義されたので、次のコマンドを使用してKafkaを起動します。
sudo systemctl start kafka
サーバーが正常に起動したことを確認するには、デバイス kafka
のログを確認します。
journalctl -u kafka
次のような出力が表示されます。
Jul 1718:38:59 kafka-ubuntu systemd[1]: Started kafka.service.
これで、ポート 9092
でリッスンしているKafkaサーバーができました。
kafka
サービスを開始しましたが、サーバーを再起動しても自動的には開始されません。サーバーの起動時に kafka
を有効にするには、次のコマンドを実行します。
sudo systemctl enable kafka
サービスを開始して有効にしたので、インストールを確認しましょう。
** "Hello World" **メッセージを公開して使用し、Kafkaサーバーが正しく実行されていることを確認しましょう。 Kafkaでメッセージを公開するには、次のものが必要です。
まず、次の名前を入力して、 TutorialTopic
テーマを作成します。
~ /kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic TutorialTopic
kafka-console-producer.sh
スクリプトを使用して、コマンドラインからジェネレーターを作成できます。これは、Kafkaサーバーのホスト名、ポート、およびトピック名をパラメーターとして想定しています。
次のように入力して、文字列 " Hello、World "
を TutorialTopic
トピックに公開します。
echo "Hello, World"|~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092--topic TutorialTopic >/dev/null
次に、 kafka-console-consumer.sh
スクリプトを使用して、Kafkaコンシューマーを作成できます。 ZooKeeperサーバーのホスト名とポート、およびテーマ名をパラメーターとして想定しています。
次のコマンドは、 TutorialTopic
からのメッセージを使用します。消費者が開始する前に公開されたメッセージの消費を許可する --from-beginning
フラグの使用に注意してください。
~ /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092--topic TutorialTopic --from-beginning
構成に問題がない場合は、ターミナルに「Hello、World」と表示されます。
Hello, World
スクリプトは引き続き実行され、トピックにさらにメッセージが投稿されるのを待ちます。新しいターミナルを開いて、プロデューサーを開始してさらにメッセージを投稿してください。消費者の出力でそれらを見ることができるはずです。
テストが完了したら、 CTRL + C
を押してユーザースクリプトを停止します。インストールをテストしたので、KafkaTのインストールに進みましょう。
[ KafkaT](https://github.com/airbnb/kafkat)は、Airbnbのツールであり、Kafkaクラスターに関する詳細情報をより簡単に表示し、コマンドラインから特定の管理タスクを実行できます。 Rubyの逸品なので、使うにはRubyが必要です。依存する他のgemをビルドするには、この build-essential
パッケージも必要です。 apt
でインストール:
sudo apt install ruby ruby-dev build-essential
これで、gemコマンドを使用してKafkaTをインストールできます。
sudo gem install kafkat
KafkaTは、 .kafkatcfg
構成ファイルを使用して、Kafkaサーバーのインストールおよびログディレクトリを決定します。また、KafkaTをZooKeeperインスタンスにポイントするエントリが必要です。
.kafkatcfg
という名前の新しいファイルを作成します。
nano ~/.kafkatcfg
次の行を追加して、KafkaサーバーとZookeeperインスタンスに関する必要な情報を指定します。
{" kafka_path":"~/kafka","log_path":"/tmp/kafka-logs","zk_path":"localhost:2181"}
これで、KafkaTを使用できます。まず、これを使用して、すべてのKafkaパーティションに関する詳細情報を表示できます。
kafkat partitions
次の出力が表示されます。
Topic Partition Leader Replicas ISRs
TutorialTopic 00[0][0]
__ consumer_offsets 00[0][0]......
クライアント関連の情報を保存するために使用されるKafkaの内部トピックである TutorialTopic
と __consumer_offsets
が表示されます。 __consumer_offsets
で始まる行は無視してかまいません。
KafkaTの詳細については、KafkaTの[GitHubリポジトリ](https://github.com/airbnb/kafkat)を参照してください。
より多くのUbuntu18.04コンピューターを使用してマルチエージェントクラスターを作成する場合は、新しいコンピューターごとに手順1、手順4、および手順5を繰り返す必要があります。さらに、 server.properties
ファイルで各変更を行う必要があります。
broker.id
の属性の値は、クラスター全体で一意になるように変更する必要があります。この属性は、クラスター内の各サーバーを一意に識別し、任意の文字列をその値として使用できます。たとえば、 " server1 "
、 " server2 "
などです。zookeeper.connect
のプロパティの値は、すべてのノードが同じZooKeeperインスタンスを指すように変更する必要があります。この属性は、Zookeeperインスタンスのアドレスを指定し、 の後に続きます<HOSTNAME/IP_ADDRESS>:<PORT>
フォーマット。たとえば、 " 203.0.113.0:2181 "、" "203.0.113.1:2181"
などです。クラスタに複数のZooKeeperインスタンスを設定する場合は、各ノードの zookeeper.connect
プロパティの値を、すべてのZooKeeperインスタンスのIPアドレスとポート番号を一覧表示する同じコンマ区切りの文字列にする必要があります。
すべてのインストールが完了したので、** kafka **ユーザーの管理者権限を削除できます。これを行う前に、他の非rootsudoユーザーとしてログアウトして再度ログインしてください。同じシェルセッションをまだ実行している場合は、「exit」と入力してこのチュートリアルを開始します。
** kafka **ユーザーをsudoグループから削除します。
sudo deluser kafka sudo
Kafkaサーバーのセキュリティをさらに向上させるには、このコマンドを使用して、** kafka **ユーザーのパスワード passwd
をロックします。これにより、誰もこのアカウントでサーバーに直接ログインできないようになります。
sudo passwd kafka -l
現時点では、rootまたはsudoユーザーのみが次のコマンドを入力して kafka
としてログインできます。
sudo su - kafka
今後、ロックを解除したい場合は、 passwd
の次の -u
オプションを使用してください。
sudo passwd kafka -u
これで、** kafka **ユーザーの管理者権限が正常に制限されました。
これで、UbuntuサーバーでApacheKafkaを安全に実行できます。 [Kafkaクライアント](https://cwiki.apache.org/confluence/display/KAFKA/Clients)(ほとんどのプログラミング言語で使用可能)を使用して、プロジェクトで使用するKafkaプロデューサーとコンシューマーを作成できます。 Kafkaの詳細については、Kafkaの[ドキュメント](http://kafka.apache.org/documentation.html)を参照することもできます。
その他のUbuntuチュートリアルについては、[Tencent Cloud + Community](https://cloud.tencent.com/developer?from=10680)にアクセスして詳細を確認してください。
参照:「Ubuntu18.04にApacheKafkaをインストールする方法」
Recommended Posts