SQL Server & Kafka

Im Laufe meiner Arbeitskarriere bin ich immer wieder auf 1 Problem gestoßen: Komponente A hat einen State, andem andere Komponenten interessiert sind. Ich hab schon die unterschiedlichsten Lösungen gesehen:

  • DataTables mit RowChanged und anschließend über .NET Remoting Events raus
  • WCF Events
  • .NET Sync-Framework

Auch habe ich im Laufe meiner Arbeitskarriere immer wieder die gleichen Probleme gesehen:

  • Bug gemeldet: State in Komponenten X passt nicht: erwartet wurde State 1 - aber State ist 2. Wie kam es zu State 2? Was waren die vorhergehenden States? Als Entwickler kann man jetzt die Log durchforsten und sich die Stimuli von Komponenten X “raussuchen” und hoffen, dass man es irgendwie nachstellen kann. Aber im Regelfall kann man nur mit der Schulter zucken - vermutlich Events durcheinander gekommen.
  • Bug gemeldet: “komisches Verhalten” - es scheinen Events zu fehlen. Das durchschauen der Logs lenkt den Verdacht auf Verbindungsverluste / Service-Neustarts / …
  • Das System steht: Self-Denial Attacks [1] ist der Klassiker. Eine Zentrale Komponente, bei der sich andere Komponenten registrieren. Startet diese neu, kommt es meist zu einem Neu-Subscriben - oft umgesetzt mit Snapshot-Delta Strategie. Das belastest dieses zentrale Komponente enorm. Ab und zu ist der Betreuer des Systems dazu gezwungen, Services manuell “langsam” zu starten, damit die zentrale Komponenten nicht “versagt”.

Ich hab daraus drei Dinge gelernt:

  1. Nachvollziehbarkeit ist alles. Man sollte den Event-Stream persistieren - man ist dann beim Nachvollziehen wenigstens in der Lage, die Stimuli lokal am Entwicklerrechner nochmal “abzuspielen” und hat dann gute Chancen, einen Fehler zu finden.
  2. Der Event Stream kann mehrfach verwendet werden. Man kann den Stream z.B. in einen Log-Compacted Stream weiterleiten oder in einen anderen Stream, der Daten historisch korrekt speichert.
  3. Der Event-Stream kann von Komponente A (der zentralen Komponente z.B.) entkoppelt werden. D.h., es kommt zu einer Entlastung, da der Komponenten im Großen und Ganzen egal ist, wie viele Interessenten es gibt.

Man stößt da relativ schnell auf eine etablierte Technologie: https://kafka.apache.org/. Es gibt auch inzwischen viele Alternativen (e.g. https://pulsar.apache.org/, https://nats.io/documentation/streaming/nats-streaming-intro/)- aber die Libraries Rund um Kafka sind gut.

Die nächste Herausforderung ist natürlich die Daten sinnvoll in die Queue / Topic / whatever zu schreiben. So kann im Code folgendes passieren:

void Foo()
{
    using(var transaction = connection.BeginTransaction())
    { }

    using(var transaction = connection.BeginTransaction())
    { }
    
    Bar();
}

void Bar()
{
    using(var transaction = connection.BeginTransaction())
    { }
}

Gut oder nicht - die “Business Transaction” muss nicht immer einer SQL Server Transaction entsprechen. Auch kann Foo parallel aufgerufen werden. Die Frage ist, wie man die Reihenfolge der Transaktionen wieder herstellen kann. Ich hab dazu ein kleines Beispiel konstruiert.

  • Es werden mehrere Bitmaps zeilenweise ausgelesen und in SQL Server gespeichert. (Tabelle 1: Storage ID (PK), ImageId, X, Y Tabelle 2: StorageId (FK), Red, Green, Blue)
  • Die Bilder sollen anschließend wieder aus SQL Server rekonstruiert werden. Dabei ist die Reihenfolge entscheidend: es soll wieder zeilenweise geschrieben werden.

Umgesetzt wurde das ganze wie folgt: ImageReader -> SQL Server -> SQL Server WAL / CDC -> Kafka -> ImageWriter

Michael Vodep

Nachdem die Daten in die Datenbank geschrieben wurden, gibt es nur 1 Wahrheit, in welcher Reihenfolge die Transaktionen abgehandelt wurden. Diese befinden sich in der WAL (Write Ahead Log) und werden durch CDC (Change Data Capture) aufbereitet:

Michael Vodep

ImageReader

Verwendet wurde Dapper, um die Daten in SQL Server zu speichern. Die beiden Tabellen schauen wie folgt aus:

CREATE TABLE storage
(
    storageId INT IDENTITY(1,1) PRIMARY KEY,
    imageId INT NOT NULL,
    x INT NOT NULL,
    y INT NOT NULL,
    UNIQUE(imageId, x, y)
);

CREATE TABLE rgb
(    
    storageId INT NOT NULL FOREIGN KEY REFERENCES storage(storageId) PRIMARY KEY, 
    red INT NOT NULL,
    green INT NOT NULL,
    blue INT NOT NULL    
);

Will man die WAL von SQL Server anschauen, geht das mit

SELECT * FROM fn_dblog(null, null)

Man wird allerdings schnell erkennen, dass diese schwer zu verarbeiten geht. CDC geht einen Schritt weiter und ist ein SQL Agent Job (wird zwar nur 1 Mal ausgeführt, läuft aber alle X Sekunden), welcher die Daten schön aufbereitet: man bekommt alten / neuen Wert und sehr wichtig: die LSN. Da CDC pro Tabelle “arbeitet”, kann man mit der LSN allerdings jederzeit die Reihenfolge der Datensätze wiederherstellen.

Anschließend schaltet man das CDC ein.

SELECT is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1 AND [name] = 'storage'

IF @@ROWCOUNT = 0
    EXEC sys.sp_cdc_enable_table 
        @source_schema = N'dbo', 
        @source_name   = N'storage', 
        @role_name     = NULL 

SELECT is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1 AND [name] = 'rgb'

IF @@ROWCOUNT = 0
    EXEC sys.sp_cdc_enable_table 
        @source_schema = N'dbo', 
        @source_name   = N'rgb', 
        @role_name     = NULL 

GO

Mit

SELECT name, is_tracked_by_cdc FROM sys.tables
SELECT capture_instance FROM cdc.change_tables
SELECT * from sys.dm_cdc_log_scan_sessions ORDER BY start_time DESC

kann man kontrollieren, ob alles passt. CDC rennt alle @pollinginterval Sekunden und liest @maxtrans und das @maxscans Mal

EXECUTE sys.sp_cdc_change_job   
    @job_type = N'capture',  
	@continuous = 1,
	@maxtrans = 1000,
    @maxscans = 100,  
    @pollinginterval = 1;

EXEC sys.sp_cdc_stop_job @job_type = 'capture';
EXEC sys.sp_cdc_start_job @job_type = 'capture';

Wichtig: Man muss das neueste Update (https://support.microsoft.com/en-ie/help/4466404/cumulative-update-13-for-sql-server-2017) installieren - mit SQL Server 2017 RC hatte ich massive Probleme mit CDC.

Das Einlesen der Bilder ist straight forward:

private static async Task Iterate(FileInfo image, Func<IDbConnection, IDbTransaction, int, int, Task> callback)
{
    var bitmap = new Bitmap(image.FullName);
 
    using (IDbConnection connection = new SqlConnection(@"XXXX"))
    {
        connection.Open();
 
        for (int y = 0; y < bitmap.Height; y++)
        {
            for (int x = 0; x < bitmap.Width; x++)
            {
                // Small transaction = "more work" for CDC / Kafka
                using (var transation = connection.BeginTransaction())
                {
                    await callback(connection, transation, x, y);
 
                    transation.Commit();
                }
            }
        }
    }
}

private static async Task WriteImageData(MyImage imageFile)
{
    var bitmap = new Bitmap(imageFile.File.FullName);
 
    await Iterate(imageFile.File, async (connection, transation, x, y) =>
    {
        var color = bitmap.GetPixel(x, y);
 
        var imageSql = @"INSERT INTO [dbo].[storage] (imageId, x, y) VALUES(@ImageId, @X, @Y); SELECT CAST(SCOPE_IDENTITY() AS INT);";
 
        var storageId = await connection.QueryAsync<int>(imageSql, new { ImageId = imageFile.ImageId, x, y }, transaction: transation);
 
        var pixelSql = @"INSERT INTO [dbo].[rgb] (storageId, red, green, blue) VALUES(@StorageId, @Red, @Green, @Blue);";
 
        await connection.ExecuteAsync(pixelSql, new { StorageId = storageId, Red = color.R, Green = color.G, Blue = color.B }, transaction: transation);
    });
}

Man kann auch vorher noch ein paar Runden Noise reinhauen, damit man die Log-Compaction von Kafka testen kann. Unter den Systemtabellen > [cdc].[dbo_rgb_CT] bzw. [cdc].[dbo_storage_CT] sieht man den Output von CDC.

SQL Server nach Kafka

Kafka kann man auch unter Windows starten. Man muss in Zookeeper und Kafka die Pfade in den Windows Style ändern und schon läuft es.

Was sind jetzt die Vorteile von Kafka in unserem Beispiel?

  • Kafka ist sehr performant, weil es Nachrichten immer nur appenden muss und auch Zero-Copy Kram (zumindest unter Linux)
  • Man kann den Event-Stream beliebig oft ausgelesen
  • Gibt man den Nachrichten einen Key, sokann man von jedem Key immer den letzten Wert behalten und alte Werte entfernen lassen (log-Compaction)

Folgender Code fragt alle 5 Sekunden nach Änderungen in den CDC Tables und speichert sie nach Kafka:

private static void HandleTimerElapsed(object sender, ElapsedEventArgs e)
{
    Timer.Enabled = false;
 
    var storageQuery = @"DECLARE @min_lsn binary(10), @from_lsn binary(10), @to_lsn binary(10);                 
        SET @to_lsn = sys.fn_cdc_get_max_lsn(); 
        SET @min_lsn = sys.fn_cdc_get_min_lsn ('dbo_storage'); 
        SET @from_lsn = sys.fn_cdc_increment_lsn(@SavedLsn); 
 
        SELECT @to_lsn AS toLsn;
         
        IF(@from_lsn < @min_lsn)
            SET @from_lsn = @min_lsn;
 
        IF (@from_lsn > @to_lsn) 
            SET @from_lsn = @to_lsn;               
         
        SELECT [__$start_lsn] AS startLsn, [__$seqval] AS seqVal, [__$operation] AS operation, [storageId], [imageId], [x], [y] FROM cdc.fn_cdc_get_all_changes_dbo_storage(@from_lsn, @to_lsn, 'all');";
 
    var rgbQuery = @"DECLARE @min_lsn binary(10), @from_lsn binary(10), @to_lsn binary(10);                 
        SET @to_lsn = sys.fn_cdc_get_max_lsn(); 
        SET @min_lsn = sys.fn_cdc_get_min_lsn ('dbo_rgb'); 
        SET @from_lsn = sys.fn_cdc_increment_lsn(@SavedLsn); 
 
        SELECT @to_lsn AS toLsn;
         
        IF(@from_lsn < @min_lsn)
            SET @from_lsn = @min_lsn;
 
        IF (@from_lsn > @to_lsn) 
            SET @from_lsn = @to_lsn;               
         
        SELECT [__$start_lsn] AS startLsn, [__$seqval] AS seqVal, [__$operation] AS operation, [storageId], [red], [green], [blue] FROM cdc.fn_cdc_get_all_changes_dbo_rgb(@from_lsn, @to_lsn, 'all');";
 
    using (IDbConnection connection = new SqlConnection(@"XXX"))
    {
        connection.Open();
 
        using (var reader = connection.QueryMultiple(storageQuery, new { SavedLsn = lastStorageToLsn }))
        {
            var range = reader.ReadSingle<dynamic>();
            var changes = reader.Read<dynamic>();
            var changesAsRecords = changes.Select(i => new StorageRecord(i.startLsn, i.seqVal, i.operation, i.storageId, i.imageId, i.x, i.y));
 
            lastStorageToLsn = range.toLsn;
 
            StorageRecords.AddRange(changesAsRecords);
        }
 
        using (var reader = connection.QueryMultiple(rgbQuery, new { SavedLsn = lastRgbToLsn }))
        {
            var range = reader.ReadSingle<dynamic>();
            var changes = reader.Read<dynamic>();
            var changesAsRecords = changes.Select(i => new RgbRecord(i.startLsn, i.seqVal, i.operation, i.storageId, i.red, i.green, i.blue));
 
            lastRgbToLsn = range.toLsn;
 
            RgbRecords.AddRange(changesAsRecords);
        }
    }
 
    var mergedResult = MergeSortedLists(StorageRecords, RgbRecords);
 
    var config = new ProducerConfig
    {
        BootstrapServers = "localhost:9092",
        EnableIdempotence = true
    };
 
    Action<DeliveryReportResult<string, byte[]>> handler = r =>
    {
        if (r.Error.IsError)
        {
            Console.WriteLine(r.Error.Reason);
        }
    };
 
    var policy = Policy
        .Handle<KafkaException>(exception => exception.Error.Code == ErrorCode.Local_QueueFull)
        .WaitAndRetryForever(i => TimeSpan.FromSeconds(5));
 
    using (var p = new Producer<string, byte[]>(config))
    {
        foreach (var item in mergedResult)
        {
            var key = string.Empty;
 
            if (item is StorageRecord)
            {
                var storageRecord = (StorageRecord)item;
 
                key = $"storage/{storageRecord.ImageId}/{storageRecord.X}/{storageRecord.Y}";
            }
 
            if (item is RgbRecord)
            {
                var rgbRecord = (RgbRecord)item;
 
                key = $"rgb/{rgbRecord.StorageId}";
            }
 
            policy.Execute(() =>
            {
                p.BeginProduce("test", new Message<string, byte[]>
                {
                    Key = key,
                    Value = Serialize(item)
                }, handler);
            });
 
            Interlocked.Increment(ref processedItems);
        }
 
        p.Flush(TimeSpan.FromSeconds(10));
    }
 
    Console.WriteLine("Iteration finished!");
 
    StorageRecords.Clear();
    RgbRecords.Clear();
 
    Timer.Enabled = true;
}

Leider hat die Confluent Kafka Schnittstelle für .NET aus meiner Sicht einen Nachteil - Back-Preassure muss man leider über Exceptions abfangen (siehe https://github.com/confluentinc/confluent-kafka-dotnet/issues/772). Mit Polly kann man eine Fehlerbehandlung relativ gut umsetzen.

Interessant ist auch noch, dass die CDC Tables einen Clustered-Index mit der LSN an erster Stelle haben - somit sollte die Abfrage nach “gib mir das nächste” effizient sein. Zusätzlich kann ich die globale Reihenfolge sehr leicht wiederherstellen, da ich die beiden (vorsortierten Listen) sehr effizient in O(n) mergen kann. Die Sequence Number ist das zweite Kriterium für die Sortierung, weil es die Reihenfolge innerhalb der Transaktion angibt.

public static IList<Record> MergeSortedLists(IList<Record> list1, IList<Record> list2)
{
    var resultList = new List<Record>(list1.Count + list2.Count);
    var comparer = new ByteComparer();

    int list1Index = 0, list2Index = 0;

    for (int i = 0; i < list1.Count + list2.Count; i++)
    {
        if (list2Index >= list2.Count)
        {
            if (list1.Any())
            {
                resultList.Add(list1[list1Index++]);
            }
        }

        if (list1Index >= list1.Count)
        {
            if (list2.Any())
            {
                resultList.Add(list2[list2Index++]);
            }
        }

        if (list1Index < list1.Count && list2Index < list2.Count)
        {
            if (comparer.Compare(list1[list1Index].Lsn, list2[list2Index].Lsn) < 0)
            {
                resultList.Add(list1[list1Index++]);
            }
            else if (comparer.Compare(list1[list1Index].Lsn, list2[list2Index].Lsn) > 0)
            {
                resultList.Add(list2[list2Index++]);
            }
            else
            {
                if (comparer.Compare(list1[list1Index].Sequence, list2[list2Index].Sequence) < 0)
                {
                    resultList.Add(list1[list1Index++]);
                }
                else
                {
                    resultList.Add(list2[list2Index++]);
                }
            }
        }
    }

    return resultList;
}

Wichtig bei Kafka ist die Option “EnableIdempotence” (siehe https://github.com/edenhill/librdkafka/issues/623#issuecomment-427324349). Damit hat man “bessere” Garantien bezüglich der Reihenfolge / Retries von Messages.

Auslesen aus Kafka

Funktioniert straight forward. Zusätzlich hab ich überprüft, ob die Reihenfolge noch passt.

Fazit

Mit CDC und Kafka hat man eine relativ einfache Lösung, um Daten aus der Datenbank an andere Systeme weiter zu liefern. Confluent bietet auch fertige Lösungen an - allerdings nicht gratis.

[1] Release It!, 2nd Edition Design and Deploy Production-Ready Software