Kafunk


Kafunk - F# Kafka client

Example

This example demonstrates a few uses of the Kafka client.

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
113: 
114: 
115: 
116: 
117: 
118: 
119: 
#r "kafunk.dll"
#r "FSharp.Control.AsyncSeq.dll"

open Kafunk
open System

let conn = Kafka.connHost "existential-host"

// metadata

let metadata = 
  Kafka.metadata conn (Metadata.Request([|"absurd-topic"|])) 
  |> Async.RunSynchronously

for b in metadata.brokers do
  printfn "broker|host=%s port=%i nodeId=%i" b.host b.port b.nodeId

for t in metadata.topicMetadata do
  printfn "topic|topic_name=%s topic_error_code=%i" t.topicName t.topicErrorCode
  for p in t.partitionMetadata do
    printfn "topic|topic_name=%s|partition|partition_id=%i" t.topicName p.partitionId


// producer

let producerCfg =
  ProducerConfig.create (
    topic = "absurd-topic", 
    partition = Partitioner.roundRobin, 
    requiredAcks = RequiredAcks.Local)

let producer =
  Producer.createAsync conn producerCfg
  |> Async.RunSynchronously

// produce single message

let prodRes =
  Producer.produce producer (ProducerMessage.ofBytes ("hello world"B))
  |> Async.RunSynchronously

printfn "partition=%i offset=%i" prodRes.partition prodRes.offset






// consumer

let consumerCfg = 
  ConsumerConfig.create ("consumer-group", "absurd-topic")

let consumer =
  Consumer.create conn consumerCfg

// commit on every message set

Consumer.consume consumer 
  (fun (s:ConsumerState) (ms:ConsumerMessageSet) -> async {
    printfn "member_id=%s topic=%s partition=%i" s.memberId ms.topic ms.partition
    do! Consumer.commitOffsets consumer (ConsumerMessageSet.commitPartitionOffsets ms) })
|> Async.RunSynchronously


// commit periodically


Consumer.consumePeriodicCommit consumer
  (TimeSpan.FromSeconds 10.0) 
  (fun (s:ConsumerState) (ms:ConsumerMessageSet) -> async {
    printfn "member_id=%s topic=%s partition=%i" s.memberId ms.topic ms.partition })
|> Async.RunSynchronously


// commit consumer offsets explicitly

Consumer.commitOffsets consumer [| 0, 1L |]
|> Async.RunSynchronously

// commit consumer offsets explicitly to a relative time

Consumer.commitOffsetsToTime consumer Time.EarliestOffset
|> Async.RunSynchronously


// get current consumer state

let consumerState = 
  Consumer.state consumer
  |> Async.RunSynchronously

printfn "generation_id=%i member_id=%s leader_id=%s assignment_stratgey=%s partitions=%A" 
  consumerState.generationId consumerState.memberId consumerState.leaderId 
  consumerState.assignmentStrategy consumerState.assignments 



// fetch offsets of a consumer group for all topics

let consumerOffsets =
  Consumer.fetchOffsets conn "consumer-group" [||]
  |> Async.RunSynchronously

for (t,os) in consumerOffsets do
  for (p,o) in os do
    printfn "topic=%s partition=%i offset=%i" t p o


// fetch topic offset information

let offsets = 
  Offsets.offsets conn "absurd-topic" [] [ Time.EarliestOffset ; Time.LatestOffset ] 1
  |> Async.RunSynchronously

for kvp in offsets do
  for (tn,offsets) in kvp.Value.topics do
    for p in offsets do
      printfn "time=%i topic=%s partition=%i offsets=%A" kvp.Key tn p.partition p.offsets

Contributing and copyright

The project is hosted on GitHub where you can report issues, fork the project and submit pull requests. If you're adding a new public API, please also consider adding samples that can be turned into a documentation. You might also want to read the library design notes to understand how it works.

The library is available under Apache 2.0. For more information see the License file in the GitHub repository.

namespace Kafunk
namespace System
val conn : KafkaConn

Full name: Index.conn
module Kafka

from Kafunk
val connHost : host:string -> KafkaConn

Full name: Kafunk.Kafka.connHost
val metadata : MetadataResponse

Full name: Index.metadata
val metadata : c:KafkaConn -> (Metadata.Request -> Async<MetadataResponse>)

Full name: Kafunk.Kafka.metadata
Multiple items
union case RequestMessage.Metadata: Metadata.Request -> RequestMessage

--------------------
module Metadata

from Kafunk.Protocol
Multiple items
type Request =
  struct
    new : topicNames:TopicName [] -> Request
    val topicNames: TopicName []
  end

Full name: Kafunk.Protocol.Metadata.Request

--------------------
Metadata.Request()
new : topicNames:TopicName [] -> Metadata.Request
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
val b : Broker
MetadataResponse.brokers: Broker []
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Broker.host: Host
Broker.port: Port
Broker.nodeId: NodeId
val t : TopicMetadata
MetadataResponse.topicMetadata: TopicMetadata []
TopicMetadata.topicName: TopicName
TopicMetadata.topicErrorCode: TopicErrorCode
val p : PartitionMetadata
TopicMetadata.partitionMetadata: PartitionMetadata []
PartitionMetadata.partitionId: Partition
val producerCfg : ProducerConfig

Full name: Index.producerCfg
type ProducerConfig =
  {topic: TopicName;
   partitioner: Partitioner;
   requiredAcks: RequiredAcks;
   timeout: Timeout;
   compression: byte;
   bufferSizeBytes: int;
   batchSizeBytes: int;
   batchLingerMs: int;}
  static member create : topic:TopicName * partition:Partitioner * ?requiredAcks:RequiredAcks * ?compression:byte * ?timeout:Timeout * ?bufferSizeBytes:int * ?batchSizeBytes:int * ?batchLingerMs:int -> ProducerConfig
  static member DefaultBatchLingerMs : int
  static member DefaultBatchSizeBytes : int
  static member DefaultBufferSizeBytes : int
  static member DefaultRequiredAcks : RequiredAcks
  static member DefaultTimeoutMs : int
  static member private messageVersion : connVersion:Version -> int16

Full name: Kafunk.ProducerConfig
static member ProducerConfig.create : topic:TopicName * partition:Partitioner * ?requiredAcks:RequiredAcks * ?compression:byte * ?timeout:Timeout * ?bufferSizeBytes:int * ?batchSizeBytes:int * ?batchLingerMs:int -> ProducerConfig
Multiple items
module Partitioner

from Kafunk

--------------------
type Partitioner = TopicName * PartitionCount * ProducerMessage -> Partition

Full name: Kafunk.Partitioner
val roundRobin : Partitioner

Full name: Kafunk.Partitioner.roundRobin
Multiple items
module RequiredAcks

from Kafunk.Protocol

--------------------
type RequiredAcks = int16

Full name: Kafunk.Protocol.RequiredAcks
val Local : RequiredAcks

Full name: Kafunk.Protocol.RequiredAcks.Local
val producer : Producer

Full name: Index.producer
Multiple items
module Producer

from Kafunk

--------------------
type Producer =
  private {conn: KafkaConn;
           config: ProducerConfig;
           state: Resource<ProducerState>;}

Full name: Kafunk.Producer
val createAsync : conn:KafkaConn -> config:ProducerConfig -> Async<Producer>

Full name: Kafunk.Producer.createAsync
val prodRes : ProducerResult

Full name: Index.prodRes
val produce : p:Producer -> m:ProducerMessage -> Async<ProducerResult>

Full name: Kafunk.Producer.produce
Multiple items
type ProducerMessage =
  struct
    new : value:ArraySegment<byte> * key:ArraySegment<byte> -> ProducerMessage
    val value: ArraySegment<byte>
    val key: ArraySegment<byte>
    static member ofBytes : value:byte [] * ?key:byte [] -> ProducerMessage
    static member ofBytes : value:ArraySegment<byte> * ?key:ArraySegment<byte> -> ProducerMessage
    static member ofString : value:string * ?key:string -> ProducerMessage
    static member private size : m:ProducerMessage -> int
  end

Full name: Kafunk.ProducerMessage

--------------------
ProducerMessage()
new : value:ArraySegment<byte> * key:ArraySegment<byte> -> ProducerMessage
static member ProducerMessage.ofBytes : value:byte [] * ?key:byte [] -> ProducerMessage
static member ProducerMessage.ofBytes : value:ArraySegment<byte> * ?key:ArraySegment<byte> -> ProducerMessage
ProducerResult.partition: Partition
ProducerResult.offset: Offset
val consumerCfg : ConsumerConfig

Full name: Index.consumerCfg
type ConsumerConfig =
  {groupId: GroupId;
   topic: TopicName;
   sessionTimeout: SessionTimeout;
   rebalanceTimeout: RebalanceTimeout;
   heartbeatFrequency: int32;
   fetchMinBytes: MinBytes;
   fetchMaxWaitMs: MaxWaitTime;
   fetchMaxBytes: MaxBytes;
   offsetRetentionTime: RetentionTime;
   endOfTopicPollPolicy: RetryPolicy;
   ...}
  static member create : groupId:GroupId * topic:TopicName * ?fetchMaxBytes:MaxBytes * ?sessionTimeout:SessionTimeout * ?rebalanceTimeout:RebalanceTimeout * ?heartbeatFrequency:int32 * ?offsetRetentionTime:RetentionTime * ?fetchMinBytes:MinBytes * ?fetchMaxWaitMs:MaxWaitTime * ?endOfTopicPollPolicy:RetryPolicy * ?autoOffsetReset:AutoOffsetReset * ?fetchBufferSize:int * ?assignmentStrategies:(AssignmentStrategyName * AssignmentStrategy) [] * ?checkCrc:bool -> ConsumerConfig
  static member DefaultAssignmentStrategies : (string * AssignmentStrategy) []
  static member DefaultAutoOffsetReset : AutoOffsetReset
  static member DefaultCheckCrc : bool
  static member DefaultEndOfTopicPollPolicy : RetryPolicy
  static member DefaultFetchBufferSize : int
  static member DefaultFetchMaxBytes : int
  static member DefaultFetchMaxWait : int
  static member DefaultFetchMinBytes : int
  static member DefaultHeartbeatFrequency : int
  static member DefaultOffsetRetentionTime : int64
  static member DefaultRebalanceTimeout : int
  static member DefaultSessionTimeout : int

Full name: Kafunk.ConsumerConfig
static member ConsumerConfig.create : groupId:GroupId * topic:TopicName * ?fetchMaxBytes:MaxBytes * ?sessionTimeout:SessionTimeout * ?rebalanceTimeout:RebalanceTimeout * ?heartbeatFrequency:int32 * ?offsetRetentionTime:RetentionTime * ?fetchMinBytes:MinBytes * ?fetchMaxWaitMs:MaxWaitTime * ?endOfTopicPollPolicy:RetryPolicy * ?autoOffsetReset:AutoOffsetReset * ?fetchBufferSize:int * ?assignmentStrategies:(AssignmentStrategyName * ConsumerGroup.AssignmentStrategy) [] * ?checkCrc:bool -> ConsumerConfig
val consumer : Consumer

Full name: Index.consumer
Multiple items
module Consumer

from Kafunk

--------------------
type Consumer =
  private {conn: KafkaConn;
           config: ConsumerConfig;
           groupMember: GroupMember;}

Full name: Kafunk.Consumer
val create : conn:KafkaConn -> cfg:ConsumerConfig -> Consumer

Full name: Kafunk.Consumer.create
val consume : c:Consumer -> handler:(ConsumerState -> ConsumerMessageSet -> Async<unit>) -> Async<unit>

Full name: Kafunk.Consumer.consume
val s : ConsumerState
type ConsumerState =
  {generationId: GenerationId;
   memberId: MemberId;
   leaderId: LeaderId;
   members: (MemberId * ProtocolMetadata) [];
   assignmentStrategy: AssignmentStrategyName;
   protocolName: ProtocolName;
   assignments: Partition [];
   closed: CancellationToken;}

Full name: Kafunk.ConsumerState
val ms : ConsumerMessageSet
Multiple items
type ConsumerMessageSet =
  struct
    new : t:TopicName * p:Partition * ms:MessageSet * hwmo:HighwaterMarkOffset -> ConsumerMessageSet
    val topic: TopicName
    val partition: Partition
    val messageSet: MessageSet
    val highWatermarkOffset: HighwaterMarkOffset
    static member commitOffset : ms:ConsumerMessageSet -> Offset
    static member commitPartitionOffsets : ms:ConsumerMessageSet -> (Partition * Offset) []
    static member firstOffset : ms:ConsumerMessageSet -> Offset
    static member lag : ms:ConsumerMessageSet -> HighwaterMarkOffset
    static member lastOffset : ms:ConsumerMessageSet -> Offset
    ...
  end

Full name: Kafunk.ConsumerMessageSet

--------------------
ConsumerMessageSet()
new : t:TopicName * p:Partition * ms:MessageSet * hwmo:HighwaterMarkOffset -> ConsumerMessageSet
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
ConsumerState.memberId: MemberId
ConsumerMessageSet.topic: TopicName
ConsumerMessageSet.partition: Partition
val commitOffsets : c:Consumer -> offsets:(Partition * Offset) [] -> Async<unit>

Full name: Kafunk.Consumer.commitOffsets
static member ConsumerMessageSet.commitPartitionOffsets : ms:ConsumerMessageSet -> (Partition * Offset) []
val consumePeriodicCommit : c:Consumer -> commitInterval:TimeSpan -> handler:(ConsumerState -> ConsumerMessageSet -> Async<unit>) -> Async<unit>

Full name: Kafunk.Consumer.consumePeriodicCommit
Multiple items
type TimeSpan =
  struct
    new : ticks:int64 -> TimeSpan + 3 overloads
    member Add : ts:TimeSpan -> TimeSpan
    member CompareTo : value:obj -> int + 1 overload
    member Days : int
    member Duration : unit -> TimeSpan
    member Equals : value:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member Hours : int
    member Milliseconds : int
    member Minutes : int
    ...
  end

Full name: System.TimeSpan

--------------------
TimeSpan()
TimeSpan(ticks: int64) : unit
TimeSpan(hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : unit
TimeSpan.FromSeconds(value: float) : TimeSpan
val commitOffsetsToTime : c:Consumer -> time:Time -> Async<unit>

Full name: Kafunk.Consumer.commitOffsetsToTime
Multiple items
module Time

from Kafunk.Protocol

--------------------
type Time = int64

Full name: Kafunk.Protocol.Time
val EarliestOffset : int64

Full name: Kafunk.Protocol.Time.EarliestOffset
val consumerState : ConsumerState

Full name: Index.consumerState
val state : c:Consumer -> Async<ConsumerState>

Full name: Kafunk.Consumer.state
ConsumerState.generationId: GenerationId
ConsumerState.leaderId: LeaderId
ConsumerState.assignmentStrategy: AssignmentStrategyName
ConsumerState.assignments: Partition []
val consumerOffsets : (TopicName * (Partition * Offset) []) []

Full name: Index.consumerOffsets
val fetchOffsets : conn:KafkaConn -> groupId:GroupId -> topics:(TopicName * Partition []) [] -> Async<(TopicName * (Partition * Offset) []) []>

Full name: Kafunk.Consumer.fetchOffsets
val t : TopicName
val os : (Partition * Offset) []
val p : Partition
val o : Offset
val offsets : Map<Time,OffsetResponse>

Full name: Index.offsets
module Offsets

from Kafunk
val offsets : conn:KafkaConn -> topic:TopicName -> partitions:seq<Partition> -> times:seq<Time> -> maxOffsets:MaxNumberOfOffsets -> Async<Map<Time,OffsetResponse>>

Full name: Kafunk.Offsets.offsets
val LatestOffset : int64

Full name: Kafunk.Protocol.Time.LatestOffset
val kvp : Collections.Generic.KeyValuePair<Time,OffsetResponse>
val tn : TopicName
val offsets : PartitionOffsets []
property Collections.Generic.KeyValuePair.Value: OffsetResponse
OffsetResponse.topics: (TopicName * PartitionOffsets []) []
val p : PartitionOffsets
property Collections.Generic.KeyValuePair.Key: Time
PartitionOffsets.partition: Partition
PartitionOffsets.offsets: Offset []
Fork me on GitHub