So implementieren Sie Echtzeit-Datenstreaming in Python

So Implementieren Sie Echtzeit Datenstreaming In Python



Die Beherrschung der Implementierung von Echtzeit-Datenstreaming in Python ist in der heutigen datenintensiven Welt eine wesentliche Fähigkeit. In diesem Leitfaden werden die wichtigsten Schritte und wesentlichen Tools zur Nutzung des Echtzeit-Datenstreamings mit Authentizität in Python erläutert. Von der Auswahl eines passenden Frameworks wie Apache Kafka oder Apache Pulsar bis hin zum Schreiben eines Python-Codes für mühelose Datennutzung, -verarbeitung und effektive Visualisierung erwerben wir die erforderlichen Fähigkeiten, um die agilen und effizienten Echtzeit-Datenkanäle aufzubauen.

Beispiel 1: Implementierung von Echtzeit-Datenstreaming in Python

Die Implementierung eines Echtzeit-Datenstreamings in Python ist im heutigen datengesteuerten Zeitalter und in der heutigen Welt von entscheidender Bedeutung. In diesem detaillierten Beispiel werden wir den Prozess des Aufbaus eines Echtzeit-Daten-Streaming-Systems mit Apache Kafka und Python in Google Colab durchgehen.







Um das Beispiel zu initialisieren, bevor wir mit dem Codieren beginnen, ist der Aufbau einer spezifischen Umgebung in Google Colab unerlässlich. Als erstes müssen wir die notwendigen Bibliotheken installieren. Für die Kafka-Integration verwenden wir die Bibliothek „kafka-python“.



! Pip Installieren Kafka-Python


Dieser Befehl installiert die „kafka-python“-Bibliothek, die die Python-Funktionen und die Bindungen für Apache Kafka bereitstellt. Als nächstes importieren wir die benötigten Bibliotheken für unser Projekt. Der Import der erforderlichen Bibliotheken, einschließlich „KafkaProducer“ und „KafkaConsumer“, sind die Klassen aus der „kafka-python“-Bibliothek, die es uns ermöglichen, mit Kafka-Brokern zu interagieren. JSON ist die Python-Bibliothek für die Arbeit mit den JSON-Daten, die wir zum Serialisieren und Deserialisieren der Nachrichten verwenden.



aus Kafka-Import KafkaProducer, KafkaConsumer
json importieren


Schaffung eines Kafka-Produzenten





Dies ist wichtig, da ein Kafka-Produzent die Daten an ein Kafka-Thema sendet. In unserem Beispiel erstellen wir einen Produzenten, um simulierte Echtzeitdaten an ein Thema namens „real-time-topic“ zu senden.

Wir erstellen eine „KafkaProducer“-Instanz, die die Adresse des Kafka-Brokers als „localhost:9092“ angibt. Dann verwenden wir den „value_serializer“, eine Funktion, die die Daten serialisiert, bevor sie sie an Kafka sendet. In unserem Fall kodiert eine Lambda-Funktion die Daten als UTF-8-kodiertes JSON. Lassen Sie uns nun einige Echtzeitdaten simulieren und an das Kafka-Thema senden.



Produzent = KafkaProduzent ( Bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( In ) .kodieren ( 'utf-8' ) )
# Simulierte Echtzeitdaten
Daten = { 'sensor_id' : 1 , 'Temperatur' : 25.5 , 'Feuchtigkeit' : 60.2 }
# Daten an das Thema senden
Produzent.send ( 'Echtzeit-Thema' , Daten )


In diesen Zeilen definieren wir ein „Daten“-Wörterbuch, das simulierte Sensordaten darstellt. Anschließend verwenden wir die Methode „Senden“, um diese Daten im „Echtzeit-Topic“ zu veröffentlichen.

Dann wollen wir einen Kafka-Konsumenten erstellen und ein Kafka-Konsumer liest die Daten aus einem Kafka-Thema. Wir erstellen einen Verbraucher, um die Nachrichten im „Echtzeit-Thema“ zu konsumieren und zu verarbeiten. Wir erstellen eine „KafkaConsumer“-Instanz und geben das Thema an, das wir nutzen möchten, z. B. (real-time-topic), und die Adresse des Kafka-Brokers. Dann ist „value_deserializer“ eine Funktion, die die von Kafka empfangenen Daten deserialisiert. In unserem Fall dekodiert eine Lambda-Funktion die Daten als UTF-8-kodiertes JSON.

Verbraucher = KafkaConsumer ( 'Echtzeit-Thema' ,
Bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )


Wir verwenden eine iterative Schleife, um die Nachrichten aus dem Thema kontinuierlich zu konsumieren und zu verarbeiten.

# Echtzeitdaten lesen und verarbeiten
für Nachricht In Verbraucher:
data = message.value
drucken ( F „Empfangene Daten: {data}“ )


Wir rufen den Wert jeder Nachricht und unsere simulierten Sensordaten innerhalb der Schleife ab und geben sie an die Konsole aus. Zum Ausführen des Kafka-Produzenten und -Konsumenten gehört die Ausführung dieses Codes in Google Colab und die individuelle Ausführung der Codezellen. Der Produzent sendet die simulierten Daten an das Kafka-Thema, und der Verbraucher liest und druckt die empfangenen Daten.


Analyse der Ausgabe während der Codeausführung

Wir werden Echtzeitdaten beobachten, die produziert und verbraucht werden. Das Datenformat kann je nach unserer Simulation oder tatsächlichen Datenquelle variieren. In diesem detaillierten Beispiel decken wir den gesamten Prozess der Einrichtung eines Echtzeit-Daten-Streaming-Systems mit Apache Kafka und Python in Google Colab ab. Wir erklären jede Codezeile und ihre Bedeutung beim Aufbau dieses Systems. Echtzeit-Datenstreaming ist eine leistungsstarke Funktion und dieses Beispiel dient als Grundlage für komplexere reale Anwendungen.

Beispiel 2: Implementierung eines Echtzeit-Datenstreamings in Python unter Verwendung von Börsendaten

Lassen Sie uns ein weiteres einzigartiges Beispiel für die Implementierung eines Echtzeit-Datenstreamings in Python anhand eines anderen Szenarios erstellen. Dieses Mal konzentrieren wir uns auf Börsendaten. Wir erstellen ein Echtzeit-Daten-Streaming-System, das die Aktienkursänderungen erfasst und sie mithilfe von Apache Kafka und Python in Google Colab verarbeitet. Wie im vorherigen Beispiel gezeigt, beginnen wir mit der Konfiguration unserer Umgebung in Google Colab. Zuerst installieren wir die benötigten Bibliotheken:

! Pip Installieren kafka-python yfinance


Hier fügen wir die „yfinance“-Bibliothek hinzu, die es uns ermöglicht, Börsendaten in Echtzeit zu erhalten. Als nächstes importieren wir die notwendigen Bibliotheken. Wir verwenden weiterhin die Klassen „KafkaProducer“ und „KafkaConsumer“ aus der „kafka-python“-Bibliothek für die Kafka-Interaktion. Wir importieren JSON, um mit den JSON-Daten zu arbeiten. Wir nutzen auch „yfinance“, um Börsendaten in Echtzeit zu erhalten. Wir importieren auch die „time“-Bibliothek, um eine Zeitverzögerung hinzuzufügen, um die Echtzeitaktualisierungen zu simulieren.

aus Kafka-Import KafkaProducer, KafkaConsumer
json importieren
import yfinance als yf
importieren Zeit


Jetzt erstellen wir einen Kafka-Produzenten für Bestandsdaten. Unser Kafka-Produzent erhält Echtzeit-Aktiendaten und sendet sie an ein Kafka-Thema namens „Aktienkurs“.

Produzent = KafkaProduzent ( Bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( In ) .kodieren ( 'utf-8' ) )

während WAHR:
stock = yf.Ticker ( „AAPL“ ) # Beispiel: Aktie von Apple Inc
stock_data = stock.history ( Zeitraum = „1d“ )
last_price = stock_data [ 'Schließen' ] .iloc [ - 1 ]
Daten = { 'Symbol' : „AAPL“ , 'Preis' : letzter Preis }
Produzent.send ( 'standard Preis' , Daten )
Zeit.Schlaf ( 10 ) # Simulieren Sie Echtzeitaktualisierungen alle 10 Sekunden


In diesem Code erstellen wir eine „KafkaProducer“-Instanz mit der Adresse des Kafka-Brokers. Innerhalb der Schleife verwenden wir „yfinance“, um den aktuellen Aktienkurs für Apple Inc. („AAPL“) zu erhalten. Dann extrahieren wir den letzten Schlusskurs und senden ihn an das Thema „Aktienkurs“. Schließlich führen wir eine Zeitverzögerung ein, um die Echtzeitaktualisierungen alle 10 Sekunden zu simulieren.

Erstellen wir einen Kafka-Konsumenten, um die Aktienkursdaten aus dem Thema „Aktienkurs“ zu lesen und zu verarbeiten.

Verbraucher = KafkaConsumer ( 'standard Preis' ,
Bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

für Nachricht In Verbraucher:
stock_data = message.value
drucken ( F „Erhaltene Bestandsdaten: {stock_data['symbol']} – Preis: {stock_data['price']}“ )


Dieser Code ähnelt dem Consumer-Setup des vorherigen Beispiels. Es liest und verarbeitet kontinuierlich die Nachrichten aus dem Thema „Aktienkurs“ und gibt das Aktiensymbol und den Preis auf der Konsole aus. Wir führen die Codezellen nacheinander aus, z. B. eine nach der anderen in Google Colab, um den Producer und Consumer auszuführen. Der Produzent erhält und sendet die Aktienkursaktualisierungen in Echtzeit, während der Verbraucher diese Daten liest und anzeigt.

! Pip Installieren kafka-python yfinance
aus Kafka-Import KafkaProducer, KafkaConsumer
json importieren
import yfinance als yf
importieren Zeit
Produzent = KafkaProduzent ( Bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( In ) .kodieren ( 'utf-8' ) )

während WAHR:
stock = yf.Ticker ( „AAPL“ ) # Apple Inc.-Aktie
stock_data = stock.history ( Zeitraum = „1d“ )
last_price = stock_data [ 'Schließen' ] .iloc [ - 1 ]

Daten = { 'Symbol' : „AAPL“ , 'Preis' : letzter Preis }

Produzent.send ( 'standard Preis' , Daten )

Zeit.Schlaf ( 10 ) # Simulieren Sie Echtzeitaktualisierungen alle 10 Sekunden
Verbraucher = KafkaConsumer ( 'standard Preis' ,
Bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

für Nachricht In Verbraucher:
stock_data = message.value
drucken ( F „Erhaltene Bestandsdaten: {stock_data['symbol']} – Preis: {stock_data['price']}“ )


Bei der Analyse der Ausgabe nach der Ausführung des Codes werden wir beobachten, wie die Aktienkursaktualisierungen für Apple Inc. in Echtzeit produziert und verbraucht werden.

Abschluss

In diesem einzigartigen Beispiel haben wir die Implementierung von Echtzeit-Datenstreaming in Python mithilfe von Apache Kafka und der „yfinance“-Bibliothek zur Erfassung und Verarbeitung der Börsendaten demonstriert. Wir haben jede Zeile des Codes ausführlich erklärt. Echtzeit-Datenstreaming kann auf verschiedene Bereiche angewendet werden, um reale Anwendungen im Finanzwesen, IoT und mehr zu erstellen.