Newer
Older
SensorHub / SensorHub.Servers / KafkaUtils.cs
root on 17 Sep 2021 1 KB first commit
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SensorHub.Servers
{
    public class KafkaUtils
    {
        private static readonly string BROKERS = ConfigurationManager.AppSettings["kafkaBrokers"];
        public static readonly string TOPIC = ConfigurationManager.AppSettings["kafkaTopic"];

        public static void produce(String topic, String message)
        {
            var conf = new ProducerConfig { BootstrapServers = BROKERS };

            Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? "Delivered message to {r.TopicPartitionOffset}"
                    : "Delivery Error: {r.Error.Reason}");

            using (var p = new ProducerBuilder<Null, string>(conf).Build())
            {
                
                p.Produce(topic, new Message<Null, string> { Value = message }, handler);
                
                // wait for up to 10 seconds for any inflight messages to be delivered.
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
}