ビデオ: 07 Streaming Analytics - Flume - Web Server logs to HDFS - Customize Sink 2024
Hadoop分散ファイルシステム(HDFS)で終わるデータの一部は、データベースのロード操作や他の種類のバッチ処理によってそこに着陸することがありますが、アプリケーションログデータなど、ハイスループットのデータストリームに流れるデータをキャプチャする場合は、 Apache Flumeは、これを簡単に、効率的に、安全に行うための標準的な方法です。
Apache Flume はApache Software Foundationの別のトップレベルプロジェクトで、大量のストリーミングデータを集約してさまざまなソースから集中データストアに移動するための分散システムです。
<!別の言い方をすれば、FlumeはHDFSへのデータの連続的な取り込みのために設計されています。データはどのような種類のデータでもかまいませんが、FlumeはWebサーバーからのログデータなど、ログデータを処理するのに特に適しています。 Flumeが処理するデータの単位は、events と呼ばれます。イベントの例はログ・レコードです。 FlumeがHadoopクラスタ内でどのように動作するかを理解するには、Flumeが1つ以上のエージェントとして動作し、各エージェントに3つのプラグイン可能なコンポーネント(ソース、チャネル、およびシンク)があることを知る必要があります。 - 2 - >
ソース
はデータを検索し、チャネルに送信します。チャネル999は、データ待ち行列を保持し、ソースとシンクとの間の導管として機能し、流入流量が流出流量を超える場合に有用である。-
Sinks は、チャンネルから取り込まれたデータを処理し、HDFSなどの目的地に配信します。
-
<! - 3 - > エージェントには、実行する各コンポーネントの少なくとも1つが必要であり、各エージェントはJava Virtual Machine(JVM)の独自のインスタンスに含まれています。ソースによってチャネルに書き込まれたイベントは、シンクがトランザクションによってそれを除去するまで、そのチャネルから除去されない。ネットワーク障害が発生した場合、チャネルは、シンクがそれらをクラスタに書き込むことができるまで、イベントをキューに入れます。メモリ内チャネルはイベントをすばやく処理できますが、揮発性で回復できませんが、ファイルベースのチャネルは永続性を提供し、障害が発生した場合に回復できます。各エージェントは複数のソース、チャネル、およびシンクを有することができ、ソースは多くのチャネルに書き込むことができるが、シンクは1つのチャネルのみからデータを取り込むことができる。
-
エージェントはFlumeを実行しているJVMだけであり、Hadoopクラスタの各 エージェントノード
コレクタノード
にデータを送信します。それを他のHadoopツールで解析できるHDFSに書き込む。
あるエージェントからのシンクが別のエージェントからソースにデータを送信するように、エージェントを連鎖させることができます。Apacheのリモート呼び出しおよびシリアライゼーションフレームワークであるAvroは、データをコンパクトなバイナリ形式に効率的にシリアル化または変換するための便利なツールとして機能するため、Flumeを使用してネットワーク経由でデータを送信する通常の方法です。
Flumeのコンテキストでは、互換性が重要です。たとえば、AvroイベントにはAvroソースが必要であり、シンクは目的地に適したイベントを配信する必要があります。 このソース、チャネル、およびシンクの大きな連鎖を作るのはFlumeエージェントの設定です。これはJavaプロパティファイルのように構造化されたローカルテキストファイルに格納されています。複数のエージェントを同じファイルに設定できます。 flume-agentというサンプルファイルを見てください。 conf - shamanというエージェントを設定するように設定されています: #エージェントshaman:shamanのコンポーネントを特定します。 sources = netcat_s1 shaman。シンク= hdfs_w1シャーマン。 channels = in-mem_c1#ソースを設定する:シャーマン。ソース。 netcat_s1。タイプ=ネットキャットシャーマン。ソース。 netcat_s1。 bind = localhostシャーマン。ソース。 netcat_s1。ポート= 44444#シンクを描く:シャーマン。シンク。 hdfs_w1。タイプ= hdfsシャーマン。シンク。 hdfs_w1。 hdfs。パス= hdfs://シャーマン。シンク。 hdfs_w1。 hdfs。 writeFormat =テキストシャーマン。シンク。 hdfs_w1。 hdfs。 fileType = DataStream#シャーマン:メモリ内のイベントをバッファリングするチャネルを設定します。チャネル。 in-mem_c1。タイプ=メモリシャーマン。チャネル。 in-mem_c1。能力= 20000シャーマン。チャネル。 in-mem_c1。 transactionCapacity = 100#ソースとシンクをチャネルshamanにバインドします。ソース。 netcat_s1。 channels = in-mem_c1シャーマン。シンク。 hdfs_w1。 channels = in-mem_c1 構成ファイルには、エージェント内の各ソース、チャネル、およびシンクのプロパティーと、それらの接続方法が指定されています。この例では、エージェントshamanは、ポート44444のデータ(netcatへのメッセージ)、メモリ内のイベントデータをバッファするチャネル、およびイベントデータをコンソールに記録するシンクを受信するソースを持っています。 この設定ファイルは、複数のエージェントを定義するために使用されている可能性があります。ここでは、物事を単純に保つために1つだけを設定しています。
エージェントを起動するには、flume-ngというシェルスクリプトを使用します。これはFlume配布のbinディレクトリにあります。コマンドラインから、エージェントコマンドを発行し、構成ファイルへのパスとエージェント名を指定します。
次のサンプルコマンドは、Flumeエージェントを起動します。
flume-ngエージェント-f / -n shaman
Flumeエージェントのログには、ソース、チャネル、およびシンクが正常に開始したことを示すエントリが含まれている必要があります。
設定をさらにテストするには、別の端末からポート44444にtelnetで接続し、任意のテキスト文字列を入力してFlumeにイベントを送信します。すべてがうまくいけば、オリジナルのFlume端末は、エージェントのログに表示されるはずのログメッセージにイベントを出力します。