using Confluent.Kafka; using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Threading.Tasks; namespace SensorHub.Servers { public class KafkaUtils { private static readonly string BROKERS = ConfigurationManager.AppSettings["kafkaBrokers"]; public static readonly string TOPIC = ConfigurationManager.AppSettings["kafkaTopic"]; public static void produce(String topic, String message) { var conf = new ProducerConfig { BootstrapServers = BROKERS }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? "Delivered message to {r.TopicPartitionOffset}" : "Delivery Error: {r.Error.Reason}"); using (var p = new ProducerBuilder<Null, string>(conf).Build()) { p.Produce(topic, new Message<Null, string> { Value = message }, handler); // wait for up to 10 seconds for any inflight messages to be delivered. p.Flush(TimeSpan.FromSeconds(10)); } } } }