Wednesday, March 16, 2016

How to send message to Flume (avro or thrift) in Java

Apache Flume is mostly used to transfer data from "Out-of-Hadoop" world into Hadoop family.  In Flume, every message transfered is called 'Event', input is call "Source", output is called "Sink".

1. Flume Source & Sink type

Flume has many input and output types.

Here are some  wildly used source(input) types:

  • "syslog" is comming with Unix system. 
  • "avro" and "thrift", these 2 are very similar. Both are apache projects.  Flume will listen on a given port for RPC call and data is encoded by Avro or Thrift way repectively.
  • "netcat"  listens on a given port and turns each line of text into an event. Just like a normal socket server. Every line is an Event.
  • "exec", get source from a external command, like 'tail -f you_log_file'
  • "http", A source type which accepts Flume Events by HTTP POST and GET.
  • "jms", allows convert message from JMS. e.g.  ActiveMQ, into flume event
  • "kafka", reads message from Apache Kafka topic.

Here are some  wildly used sink(output) types:

  • "logger", log event at INFO level. Typically useful for testing/debugging purpose.
  • "hdfs", writes events into the Hadoop Distributed File System (HDFS)
  • "hive", write events into a Hive table or partition
  • "hbase" and "asynchbase", writes data to HBase in sync or async way.
  • "avro" and "thrift", events sent to this sink are turned into Avro or Thrift events and sent to the configured hostname / port pair.
  • "null" just discards all events like /dev/null
  • "kafka", public data to kafka topic.

2. Flume client for Avro/Thrift source.

If you using avro or thrift source as message input,  following is a java client to send message into flume. This is useful for developing or testing. For me a client like this helps me debugging my spark code.

package com.shengwang.demo;

import java.nio.charset.Charset;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

public class DemoMain {

  public static void main(String[] args) throws EventDeliveryException {
    String ip = "192.168.203.156";
    int port = 34343;
    
//  RpcClient client = RpcClientFactory.getDefaultInstance(ip, port); // Avro
    RpcClient client = RpcClientFactory.getThriftInstance(ip, port);  // Thrift
    Event event = EventBuilder.withBody("hello flume",Charset.forName("UTF8"));
    client.append(event);
    client.close();
  }
}

You need to add following dependency to your pom.xml file

    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-sdk</artifactId>
      <version>1.5.0</version>
    </dependency>

Since I'm using CDH 5.4.9, the flume version is 1.5.0 in my environment. Choose the version matches yours. The hierarchy looks like below.

image

3. Run it

For my lab's configuration, flume has a thrift source and output event to kafka. So I can use kafka console comsumer to print out the message. The message send out from my Java client to Flume, then transfers to Kafka topic by flume and finally gets consumed and prints out on command line. 

So first start the kafka-console-consumer on server running kafka, which is a tool come with kafka package, it will block and wait for message. Then run our DemoMain client. The message "hello flume" will print out on screen.

image

0 comments:

Post a Comment

Powered by Blogger.

About The Author

My Photo

Has been a senior software developer, project manager for 10+ years. Dedicate himself to Alcatel-Lucent and China Telecom for delivering software solutions.

Pages

Unordered List