Kategorien
Digitale Transformation Implementierung

Lambda Architektur: Umsetzung mit Apache Spark

Lambda Architektur Einführung

In der heutigen Zeit in welcher Stillstand schon als Rückschritt gewertet wird, kommt es gerade in der Wirtschaft darauf an schneller auf Trends zu reagieren und die richtigen Schlüsse daraus zu ziehen. Aus diesem Grund werden für Entscheidungsprozesse nicht nur Daten aus klassischen Datenbanken herangezogen, die einmal am Tag oder besser: über Nacht ihre Daten an die Folgesysteme weiterleiten, sondern Daten aus unterschiedlichsten Quellen wie soziale Medien, Logdateien, Bilder, Sensordaten, usw. genutzt. Das heißt nicht nur die Heterogenität der Daten hat zugenommen, sondern auch die Umschlagsgeschwindigkeit und damit auch Geschwindigkeit, in der reagiert werden muss. Wenn zum Beispiel ein Bankautomat einen Defekt oder kein Bargeld mehr hat, sollte dies eine zeitnahe Maßnahme nach sich ziehen, um die Kundenzufriedenheit hoch zu halten. Moderne IT- Architekturen müssen diesen veränderten Umständen Rechnung tragen.

Die Lambda Architektur zählt im Kontext von Big Data Szenarien als eine häufig benutzte Form der Architektur in IT- Systemlandschaften, wenn es darum geht die Anforderungen von zwei unterschiedlichen Nutzergruppen unter einen Hut zu bekommen. Auf der einen Seite stehen Nutzer, die seit jeher Daten von hoher Qualität verarbeiten und auswerten müssen. Diese sind meist noch mit zusätzlichen, berechneten Kennzahlen angereichert. Diese „klassischen“ Nutzer brauchen die Daten stichtagsbezogen für Fachbereiche wie das Meldewesen, Accounting, Risk oder Controlling. Auf der anderen Seite stehen Nutzer mit einem kurzfristigen Informationsbedürfnis, die schnell auf Ereignisse reagieren müssen. Dies kann der defekte Geldautomat für den Wartungstechniker sein, aber auch der nächste Boykottaufruf für ein bestimmtes Unternehmen in den sozialen Medien für einen Aktienhändler.

Abbildung 1: Lambda Architektur Überblick

Wie kann man diese unterschiedlichen Nutzer auf einer Plattform vereinen?

Die Lambda Architektur erreicht dies durch die Verwendung von zwei unterschiedlichen Layern. Zum einen mit einem Batch Layer, der für Konsistenz sorgt. Diese wird durch das wichtige Prinzip der Unveränderlichkeit der Daten erreicht. D.h. die Quelldaten werden nie verändert, sondern es werden nur Kopien erzeugt und gespeichert. Die unveränderlichen Ursprungsinformationen diesen als Berechnungsbasis für weitere KPIs, welche die Quelldaten als neue Kopie ergänzen. Dadurch ist eine saubere Trennung von Quelldaten und abgeleiteten, berechneten Daten gewährleistet. Dieser Layer ist vor allem für die klassischen Nutzer von Bedeutung.

Der Speed Layer ist vor allem für die Real Time Analyse von Daten wichtig. Er soll das vergleichsweise große Zeitfenster, bis Daten aus dem Batch Layer zur Verfügung stehen, schließen. Es werden nur die neuesten Daten aus den Vorsystemen verarbeitet und ggf. weitere KPIs on the fly berechnet und in Echtzeit zur Verfügung gestellt. Die berechneten Kennzahlen, sind meist nur eine Teilmenge der Kalkulation des Batch Layers. Sobald die Berechnung im Batch Layer zu einem späterene Zeitpunkt abgeschlossen ist, werden die fehlenden Kennzahlen im Serving Layer ergänzt. Ziel des Speed Layers ist es zeitnah ein vorläufiges Bild auf Kosten von Vollständigkeit und Genauigkeit zur Verfügung zu stellen.

Im Serving Layer können sich beide Nutzergruppen entsprechend ihrer Anforderungen aus einem oder beiden Layern ihre Reports zusammenstellen.

Moderne Big Data Architekturen der Finanzindustrie oder Industrie 4.0 arbeiten oft mit einer Lambda Architektur. Hier werden Streamingquellen (Sensordaten, Internet of Things oder Change Data Captures aus Datenbanken) für den Speed Layer angezapft und ausgewertet. Meist wird der Speed Layer noch mit Machine Learning Methoden ausgestattet, um Daten effektiver und automatisiert auswerten zu können. Der Batch Layer wird aus relevanten Vorsystemen und ERP Systemen gespeist. Aufgrund der Heterogenität der Datenformate wird häufig ein Data Lake für die Speicherung benutzt. Die Daten beider Layer stehen dort für unterschiedliche Applikationen wie Machine Learning oder Reporting zur Verfügung.

Abbildung 2: Verwendung einer Lambda Architektur mit Machine Learning

Vorteile der Lambda Architektur

Man bekommt als Nutzer auf einer Plattform ein vollständiges Bild für seine Abfragen. Aus den Batch Views erhält man einen umfangreichen Satz an berechneten Kennzahlen von hoher Qualität und aus dem Speed Layer Informationen über die Echtzeitlage, um nahe am aktuellen Geschehen sein zu können. Da Real Time Daten schnell an Wert verlieren, werden die Daten mit dem nächsten Lauf aus dem Batch Layer mit ggf. zusätzlich berechneten Kennzahlen im Serving Layer ersetzt. Durch die Unveränderbarkeit der Quelldaten im Batch Layer ist es möglich durch eine Neuprozessierung der Daten neue Reports zu erstellen (wenn nötig auch auf historischen Datenständen), falls sich die Berechnungsmethodik einzelner Kennzahlen ändert. Ein weiterer Vorteil der Lambda Architektur ist ihre Skalierbarkeit. Die Architektur selbst gibt keine Technologie vor, wird aber häufig im Big Data Kontext eingesetzt, weswegen verteilte Systeme verwendet werden, die gut horizontal skalieren (scale out). Hier bieten sich daher insbesondere Tools aus dem Hadoop Universum an.

Für den Batch Layer bieten sich z.B. Tools wie Apache Pig, Apache Spark für die Prozessierung und Hive, Impala, PostgreSQL oder auch HDFS für eine Persistenz an. Der Speed Layer kann beispielsweise aus Apache Storm, Kafka oder Spark Streaming mit entsprechender Ablage wie Cassandra, HBase oder Mongo DB bestehen.

Implementierung einer Lambda Architektur

Die gerade erläuterte Lambda Architektur stellt die Grundlage für den Aufbau unseres Demo-ETL-Systems dar. Ziel dieses Systems soll das Testen und Ausprobieren von Tools der Cloudera CDH Plattform und deren Schnittstellen sein, um damit ein Minimalbeispiel einer Lambda-ETL-Strecke zu realisieren. Dabei lag der Fokus auf Apache Spark, einem Framework für Cluster-Computing.

 Aufgrund des enormen Umfangs des Hadoop Ökosystems, mit seinen unzähligen Komponenten, haben wir uns bei der Umsetzung auf ein paar wenige, aber häufig anzutreffende Tools konzentriert: namentlich Hive, Spark und Kafka. Als Vorsystem dient ein SAP Bank Analyzer 9 auf einer HANA Datenbank.

Abbildung 3: Datenflüsse in der umgesetzten Lambda Architektur

Umgesetzt wurde ein Business-Use-Case zum Abzinsen von Zahlungsströmen. Diese Zahlungsströme werden als Bestandteil von Finanzgeschäften im SAP Bank Analyzer in der HANA DB abgelegt. Von dort gelangen sie mit Hilfe von Spark in das Hadoop System, wo sie anschließend abgezinst werden. Dazu benötigt man aktuelle Marktdaten, welche hier aber nicht manuell eingepflegt und aktualisiert, sondern über eine öffentliche API aus dem Internet bezieht.  Für sogenannte „Open Data“ bietet sich die REST Schnittstelle der Europäischen Zentralbank zu ihrem Statistical Data Warehouse an, über welche eine Vielzahl an Marktdaten und -raten bezogen werden können. In dem hier besprochenen Beispiel waren es EURIBOR Geldmarktsätze und EUROYIELD Kapitalmarktsätze für die Barwertberechnung des Zahlungsstromes. Der abgezinste Zahlungsstrom wird danach in einer Hive Tabelle im HDFS abgelegt.  Dabei wird jedoch nur das Delta der Sätze geschrieben, sprich ein Datensatz wird nur dann abgespeichert, wenn dieser noch nicht in der Tabelle existiert oder sich aktualisiert hat. Anderenfalls wird das Ergebnis ausgefiltert und nicht in Hive persistiert.

Die technische Umsetzung erfolgt mittels zwei verschiedener Java Programme und selbstgeschriebener Spark-Java-Bibliotheken. Obwohl Spark als unsere Haupt-API in Scala geschrieben ist, haben wir Java verwendet. Unsere Erfahrungen mit Kunden haben gezeigt, dass diese aufgrund der besseren Verfügbarkeit von Entwicklern und der größeren Verbreitung, bevorzugt Java einsetzen.

Im Folgenden soll nun ein detaillierterer Blick auf die einzelnen Komponenten der ETL-Strecke geworfen werden. Das Programm (1) zum Laden der Marktdaten erhält über einen REST Aufruf JSON-Dateien vom ECB Statistical Data Warehouse. Diese werden anschließend geparst, um die relevanten Daten zu extrahieren und neu zu bündeln. Mittels der Kafka-Java-API wird ein Kafka-Producer implementiert, welcher die Daten als JSON-String formatiert in ein Kafka-Topic (2) schreibt. Da nur jeweils die letzte aktuelle Version der Marktdaten benötigt werden, bietet sich ein solches Topic als einfach zu bedienender Key-Value Store an.

Man kann diesen Schritt selbstverständlich auch direkt in Spark erledigen und auch auf das Zwischenspeichern der Daten in Kafka Topics verzichten. Jedoch war der Fokus möglichst viele Schnittstellen mit einem einfachen Use-Case zu testen. Zusätzlich ist auf diese Weise die Nachvollziehbarkeit von älteren Berechnungen sichergestellt.

Das Hauptprogramm zum Laden der Cashflows (3) wurde mit Hilfe der Spark-Java-API entwickelt. Dafür entstanden zwei Versionen des Programms, eine für die Stream- und eine zweite für die Batch-Verarbeitung. Dank der Möglichkeit Spark-Streaming über die Triggereinstellung „One-Time-Micro-Batch“ auch für Batch Verarbeitungen nutzen zu können, hält sich hierbei der Implementierungs- und Wartungsaufwand in Grenzen. Der Großteil des Codes kann so für beide Fälle benutzt werden. Der Verarbeitungsmodus wird einfach nach Bedarf über eine Konfigurations-Datei ausgewählt. Eine solche Einzelverarbeitung bringt alle bekannten Vorteile der Spark-Streaming-Bibliothek, wie etwa der automatischen Wiederherstellung der Query im Falle eines unbeabsichtigten Systemshutdowns oder -absturzes von erstellten Checkpoints mit sich. Zudem bleiben aber auch alle Vorteile der Batch Verarbeitung bestehen, wie z.B. das Reduzieren von Kosten durch gezieltes Hoch- und Runterfahren des Clusters.

Mittels der Spark-API wird auf die HANA-Datenbank (4) zugegriffen und der jeweils neueste Satz abgezogen. Die Erkennung läuft dabei über eine Spalte mit einer fortlaufenden ganzen Zahl vom Datentyp Long, welche aus dem Timestamp des Datensatzes generiert wird. Dieser Umweg musste gegangen werden, da der SAP- Zeitstempel in diesem Fall nicht kompatibel mit dem Spark- Zeitstempel ist. Das Laden geschieht dann in sog. Microbatchanfragen, welche in bestimmten Zeitintervallen auf die HANA DB abgesetzt werden und über Abfrage der eben beschriebenen Zahl, alle Daten seit dem letzten Microbatch abholen. Dieser Vorgang, wird dabei von Spark automatisch erledigt und verwaltet. Im Falle eines herkömmlichen Spark- Batchabzugs, würde man alle Daten ab dem letzten verarbeiteten Zeitstempel abziehen, müsste diese dann jedoch selbst verwalten und abspeichern. Die Spark-Streaming-API erledigt dies, wie bereits erläutert, automatisch mit Hilfe der Checkpoint-Dateien.

Die neuesten Marktdaten werden direkt mittels der Spark-Kafka-Implementierung aus dem bereits erwähnten Kafka- Topic geladen (5) und der FTP- Bibliothek für das Abzinsen der Cashflows bereitgestellt. Diese interpoliert die Stützstellen der Zinskurve auf die Daten des Cashflows und zinst diesen entsprechend ab. Der daraus resultierende Dataframe wird danach mit Hilfe einer Delta- Library auf Veränderung zu bereits in Hive verfügbaren Datensätzen untersucht und gegebenenfalls gefiltert. Dazu werden die Inhalte der relevanten Felder gehasht und mit den Werten in der Zieltabelle verglichen. Bei Übereinstimmung wird dann die entsprechende Zeile aus dem Dataframe gefiltert. Das Ergebnis mitsamt der Hash -Werten wird am Ende von Spark in eine partitionierte Hive Tabelle (6) geschrieben. Die Partitionierung nach Monat und Jahr hilft dabei die Performance beim Lesen der Daten für den Deltaabgleich möglichst hoch zu halten.

Für zeitkritische Pipelines würden sich normalerweise Key-Value Stores wie etwa HBASE besser eignen, jedoch wäre so eine Umsetzung der Delta-Logik nicht effizient möglich gewesen. Dieser POC zeigt aber gut, dass es über Microbatches möglich ist, sowohl Daten mittels einer herkömmlichen JDBC-Verbindung von HANA zu streamen als auch in Hive Tabellen zu schreiben.

Warum haben wir uns nun für eine Lambda-Architektur entschieden, obwohl sich doch die Programme, bis auf die Art des Datenabzugs nicht unterscheiden? Man sich gut vorstellen, dass in einer weiteren Ausbaustufe, neben dem Abzinsen zum Beispiel noch zusätzlich die FTP-Raten berechnet werden oder es werden weitere tiefergehende Analysen oder Aggregationen durchgeführt. Diese wären dann entweder zu zeitaufwändig oder, wie im Falle der Aggregationen, gar nicht möglich im Streaming-Verfahren. Man könnte dies jedoch in die Batchverarbeitung einfließen lassen. Somit hätte man trotzdem weiterhin seinen abgezinsten Cashflow in nahezu Echtzeit und zusätzlich weitere (aggregierte) Daten zum Analysieren am nächsten Tag.

Fazit und Ausblick

Die oben beschriebene Lambda Architektur ist bereits weit verbreitet und hat sich in verschiedenen Industriezweigen, auch im Finanzdienstleistungssektor, bewährt. Viele Banken und Versicherungen befassen sich mit dieser und vergleichbaren Architekturmustern, um ihre Landschaften im Hinblick auf Agilität, Skalierbarkeit, Geschwindigkeit und natürlich Kostenerwägungen (TCO) zu optimieren.

Allerdings ist ein genauerer Blick auf den jeweiligen individuellen Kontext erforderlich. Eine Unterscheidung zwischen den jeweils unterschiedlichen funktionalen und technischen Anwendungsbereichen dieser Architekturmuster ist ratsam.

Die Frage, die sich für uns noch stellt, ist ob es neben der Umsetzung einer Lambda Architektur noch weitere Möglichkeiten gibt und wie aufwendig deren Implementierung ist. Damit haben wir uns im Blogeintrag „Wechsel von Lambda zur Delta Architektur“ beschäftigt.

Weitere Architekturansätze und verwandte Themen finden sich unter anderem in diesem Buch: The Digital Journey of Banking and Insurance, Volume III – Data Storage, Data Processing and Data Analysis | Volker Liermann | Springer

Literatur

Akhgarnush, Eljar, Lars Broeckers, and Thorsten Jakoby. „Hadoop – a standard framework for computer clusters.“ In The impact of digital transformation and fintech on the finance professional, by Volker Liermann and Claus Stegmann. New York: Palgrave Macmillan, 2019.

Kopic, Eva, Bezu Teschome, Thomas Schneider, Ralph Steurer, and Sascha Florin. „In-memory databases and their impact on our (future) organizations.“ In The impact of digital transformation and fintech on the finance professiona, by Volker Liermann and Claus Stegmann „. New York: Palgrave Macmillan, 2019.

Director

Michael Morawski arbeitet seit 2008 bei der ifb group. Er führte seitdem Projekte im In- und Ausland durch, hauptsächlich im Zusammenhang mit Regulatory Reporting & Controlling, oft mit ETL-Pipelines in verschiedenen Systemen wie SAP Bank Analyzer, SAP Business Warehouse, SAP HANA, SAP FPSL und Hadoop. Er leitet die Hadoop- Arbeitsgruppe im ifb.