Apache Kafka: Producers


El punto más importante al utilizar Apache Kafka es el envío y consumo de mensajes, en este post explicaremos como funcionan los producers y consumers, si tienes dudas sobre como instalar tu cluster de kafka te recomendamos el post Primeros pasos con Apache Kafka en Español !.

Producers

Un producer es un componente que escribe mensajes en un topic, para poder hacerlo requiere de la siguiente información:

  • Uno o muchos brokers de kafka
  • El nombre de un topic al cuál va a escribir
  • El mensaje que va a depositar en el topic

Acknowledges

La forma en la que podemos asegurar que un mensaje se entrego de forma exitosa es a través del uso de acknowledges, en los producers de kafka contamos con los siguientes:

  • acks=0 : Define que no es necesario recibir un ack por cada mensaje, esto mejora el performance de forma importante pero es posible perder mensajes.
  • acks=1 : Define que es necesario un ack del broker líder que recibe el mensaje, esto asegura que el líder recibió el mensaje pero no asegura que todas las replicas lo tienen.
  • acks=all : Define que es necesario un ack del broker líder y todas las replicas, esto asegura que tanto el líder como todas las replicas recibieron el mensaje.

Podemos concluir con lo anterior que entre menor sea el número de acks tendremos un mejor performance pero más probabilidades de perder datos y entre más acks tendremos un menor performance pero sin perdida de datos. Debemos considerar esto a la hora de diseñar nuestra aplicación.

Messages keys

Cuando utilizamos un producer podemos enviar cualquier información que necesitemos, pero recordemos que al contar con muchas particiones nosotros no podemos definir a que partición se mandará el mensaje, pero esto trae la pregunta, ¿Cómo puedo asegurar el orden de mis mensajes? La respuesta es Haciendo uso de message keys.

Adicional al mensaje es posible indicar un message key, con esto, todos los mensajes que cuenten con la misma message key se entregarán en la misma partición, si recordamos kafka asegura el orden de los mensajes que se encuentren dentro de la misma partición.

Programando nuestro producer

Una vez que entendemos la teoría el siguiente paso es programarlo, para esto programaremos nuestro kafka producer, para esto sigamos los siguientes pasos:

Paso 1 Configurando el proyecto

El primer paso será configurar nuestro proyecto, para esto agregaremos las siguientes entradas a nuestro archivo pom.xml:


	
		org.apache.kafka
		kafka-clients
		1.0.1
	


	
		
			maven-compiler-plugin
			3.2
			
				1.8
				1.8
			
		
	

Con lo anterior definiremos la dependencia a utilizar para crear un producer y el plugin para definir la versión 1.8 de java.

Paso 2 Definiendo de errores

El siguiente paso será crear algunas excepciones a utilizar en nuestro producer:


/**
 * @author raidentrance
 *
 */
public enum KafkaErrors {
	PRODUCER_NOT_CONNECTED("Not alive connection available");

	private String message;

	private KafkaErrors(String message) {
		this.message = message;
	}

	public String getMessage() {
		return message;
	}
}

La enumeración anterior será utilizada para definir los mensajes de error que se generen en nuestro producer.

/**
 * @author raidentrance
 *
 */
public class KafkaProducerException extends Exception {

	private static final long serialVersionUID = -2134618105767008561L;

	public KafkaProducerException(String message) {
		super(message);
	}
}

La clase anterior define la excepción que se propagará en caso de que una excepción con Kafka suceda.

Paso 3 Creando el kafka producer

Una ve que contamos con las clases necesarias crearemos un pequeño controller para enviar los mensajes a kafka.


import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.raidentrance.kafka.error.KafkaErrors;
import com.raidentrance.kafka.error.KafkaProducerException;

/**
 * @author raidentrance
 *
 */
public class KafkaMessageProducer {
	private Properties properties;
	private Producer producer;

	public KafkaMessageProducer(Properties properties) {
		this.properties = properties;
	}

	public void connect() {
		producer = new KafkaProducer(properties);
	}

	public void sendMessage(String topic, String key, String value) throws KafkaProducerException {
		if (producer != null) {
			producer.send(new ProducerRecord(topic, key, value));
		} else {
			throw new KafkaProducerException(KafkaErrors.PRODUCER_NOT_CONNECTED.getMessage());
		}
	}

	public void sendMessage(String topic, String value) throws KafkaProducerException {
		if (producer != null) {
			producer.send(new ProducerRecord(topic, value));
		} else {
			throw new KafkaProducerException(KafkaErrors.PRODUCER_NOT_CONNECTED.getMessage());
		}
	}

	public void disconnect() {
		producer.close();
	}
}

El controller anterior define los siguientes métodos:

  • El método connect será utilizado para crear nuestro producer
  • El método

    void sendMessage(String topic, String key, String value) :  Se utilizará para enviar mensajes al topic especificado indicando una llave.

  • El método

    void sendMessage(String topic, String value) : Se utilizará para enviar mensajes al topic especificado sin indicar una llave.

Paso 4 Probando todo junto

Una vez que tenemos todo listo el siguiente paso será enviar mensajes, para esto crearemos el siguiente método main:

public static void main(String[] args) throws KafkaProducerException {
	Properties props = new Properties();
	props.setProperty("bootstrap.servers", "kafka_host:9092");
	props.setProperty("key.serializer", StringSerializer.class.getName());
	props.setProperty("value.serializer", StringSerializer.class.getName());
	props.setProperty("acks", "1");
	props.setProperty("retries", "3");
	KafkaMessageProducer producer = new KafkaMessageProducer(props);
	producer.connect();
	producer.sendMessage("geeks-mexico", "geek-key", "raidentrance");
	producer.disconnect();
}

El código anterior muestra las propiedades necesarias para enviar y recibir mensajes a kafka y cómo mandar mensajes al topic geeks-mexico utilizando la llave geek-key el mensaje raidentrance.

Si te gusta el contenido y quieres enterarte cuando realicemos un post nuevo síguenos en nuestras redes sociales https://twitter.com/geeks_mx y https://www.facebook.com/geeksJavaMexico/.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

 

Anuncios

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión /  Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión /  Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión /  Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión /  Cambiar )

w

Conectando a %s