Visualisera Confluent-data i Amazon QuickSight med Amazon Athena

Visualisera Confluent-data i Amazon QuickSight med Amazon Athena

Källnod: 2035835

Det här är ett gästinlägg skrivet av Ahmed Saef Zamzam och Geetha Anne från Confluent.

Företag använder dataströmmar i realtid för att få insikter om företagets prestanda och fatta välgrundade, datadrivna beslut snabbare. Eftersom realtidsdata har blivit avgörande för företag, anpassar ett växande antal företag sin datastrategi för att fokusera på data i rörelse. Händelseströmning är det centrala nervsystemet i en data i rörelse-strategi och i många organisationer, Apache Kafka är verktyget som driver den.

Idag är Kafka välkänt och flitigt använt för att streama data. Men att hantera och driva Kafka i stor skala kan fortfarande vara utmanande. Konfluenta erbjuder en lösning genom sin fullt hanterade, molnbaserade tjänst som förenklar drift och drift av dataströmmar i stor skala. Confluent utökar Kafka med öppen källkod genom en uppsättning relaterade tjänster och funktioner utformade för att förbättra upplevelsen av data i rörelse för operatörer, utvecklare och arkitekter i produktionen.

I det här inlägget visar vi hur Amazonas Athena, Amazon QuickSight, och Confluent arbetar tillsammans för att möjliggöra visualisering av dataströmmar i nästan realtid. Vi använder Kafka-kontakten i Athena för att göra följande:

  • Sammanfoga data inuti Confluent med data som lagras i en av de många datakällor som stöds av Athena, som t.ex. Amazon enkel lagringstjänst (Amazon S3)
  • Visualisera Confluent-data med QuickSight

Utmaningar

Specialbyggda strömbearbetningsmotorer, som Sammanflytande ksqlDB, tillhandahåller ofta SQL-liknande semantik för realtidstransformationer, kopplingar, aggregationer och filter på strömmande data. Med ksqlDB kan du skapa ihållande frågor, som kontinuerligt bearbetar strömmar av händelser enligt specifik logik, och materialiserar strömmande data i vyer som kan frågas vid en tidpunkt (dra frågor) eller prenumererade på av kunder (push-frågor).

ksqlDB är en lösning som gjorde strömbehandling tillgänglig för ett större antal användare. Men pull-frågor, som de som stöds av ksqlDB, kanske inte är lämpliga för alla användningsfall för strömbearbetning, och det kan finnas komplexitet eller unika krav som pull-frågor inte är designade för.

Datavisualisering för Confluent data

Ett vanligt användningsfall för företag är datavisualisering. För att visualisera data lagrad i Confluent kan du använda en av över 120 förbyggda kontakter, som tillhandahålls av Confluent, för att skriva strömmande data till en destinationsdatalager som du väljer. Därefter ansluter du ditt Business Intelligence-verktyg (BI) till datalagret för att börja visualisera data.

Följande diagram visar en typisk arkitektur som används av många Confluent-kunder. I detta arbetsflöde skrivs data till Amazon S3 genom Confluent S3 diskbänkskontakt och analyseras sedan med Athena, en serverlös interaktiv analystjänst som gör att du kan analysera och söka efter data lagrad i Amazon S3 och olika andra datakällor med standard SQL. Du kan sedan använda Athena som en indatakälla till QuickSight, en mycket skalbar molnbaserad BI-tjänst, för vidare analys.

typisk arkitektur som används av många Confluent-kunder.

Även om detta tillvägagångssätt fungerar bra för många användningsfall, kräver det att data flyttas, och därför dupliceras, innan det kan visualiseras. Denna dubblering lägger inte bara till tid och ansträngning för dataingenjörer som kan behöva utveckla och testa nya skript, utan skapar också dataredundans, vilket gör det mer utmanande att hantera och säkra data, och ökar lagringskostnaderna.

Berika data med referensdata i ett annat datalager

Med ksqlDB-frågor är källan och destinationen alltid Kafka-ämnen. Därför, om du har en dataström som du behöver berika med extern referensdata, har du två alternativ. Ett alternativ är att importera referensdata till Confluent, modellera det som en tabell och använda ksqlDB:s strömtabellsanslutning för att berika strömmen. Det andra alternativet är att mata in dataströmmen i ett separat datalager och utföra kopplingsoperationer där. Båda kräver dataflyttning och resulterar i dubblettdatalagring.

Lösningsöversikt

Hittills har vi diskuterat två utmaningar som inte hanteras av konventionella strömbearbetningsverktyg. Finns det en lösning som löser båda utmaningarna samtidigt?

När du vill analysera data utan separata pipelines och jobb är Athena ett populärt val. Med Athena kan du köra SQL-frågor på en brett utbud av datakällor— förutom Amazon S3 — utan att lära sig ett nytt språk, utveckla skript för att extrahera (och duplicera) data eller hantera infrastruktur.

Nyligen meddelade Athena en kontakt för Kafka. Precis som Athenas andra kopplingar, bearbetas frågor om Kafka inom Kafka och returnerar resultat till Athena. Anslutningen stöder predikat-pushdown, vilket innebär att om du lägger till filter i dina frågor kan du minska mängden data som skannas, förbättra frågeprestanda och minska kostnaderna.

Till exempel, när du använder denna anslutning, mängden data som skannas av frågan SELECT * FROM CONFLUENT_TABLE kan vara betydligt högre än mängden data som skannas av frågan SELECT * FROM CONFLUENT_TABLE WHERE COUNTRY = 'UK'. Anledningen är att AWS Lambda funktion som tillhandahåller runtime-miljön för Athena-anslutningen, filtrerar data vid källan innan den returneras till Athena.

Låt oss anta att vi har en ström av onlinetransaktioner som flödar in i Confluent och kundreferensdata lagrade i Amazon S3. Vi vill använda Athena för att sammanfoga båda datakällorna och producera en ny datauppsättning för QuickSight. Istället för att använda S3 sink-kontakten för att ladda data till Amazon S3, använder vi Athena för att fråga Confluent och sammanfoga den med S3-data – allt utan att flytta data. Följande diagram illustrerar denna arkitektur.

Athena förenar båda datakällorna och producerar en ny datauppsättning för QuickSight

Vi utför följande steg:

  1. Registrera schemat för dina Confluent-data.
  2. Konfigurera Athena-kontakten för Kafka.
  3. Alternativt kan du interaktivt analysera Confluent-data.
  4. Skapa en QuickSight-datauppsättning med Athena som källa.

Registrera schemat

För att ansluta Athena till Confluent måste kopplingen schemat för ämnet registreras i AWS Glue Schema Registry, som Athena använder för frågeplanering.

Följande är en exempelpost i Confluent:

{ "transaction_id": "23e5ed25-5818-4d4f-acb3-73ef04d51d21", "customer_id": "126-58-9758", "amount": 986, "timestamp": "2023-01-03T15:40:42", "product_category": "health_fitness"
}

Följande är schemat för denna post:

{ "topicName": "transactions", "message": { "dataFormat": "json", "fields": [ { "name": "transaction_id", "mapping": "transaction_id", "type": "VARCHAR" }, { "name": "customer_id", "mapping": "customer_id", "type": "VARCHAR" }, { "name": "amount", "mapping": "amount", "type": "INTEGER" }, { "name": "timestamp", "mapping": "timestamp", "type": "timestamp", "formatHint": "yyyy-MM-dd'T'HH:mm:ss" }, { "name": "product_category", "mapping": "product_category", "type": "VARCHAR" }, { "name": "customer_id", "mapping": "customer_id", "type": "VARCHAR" } ] }
}

Dataproducenten som skriver data kan registrera detta schema med AWS Glue Schema Registry. Alternativt kan du använda AWS Management Console or AWS-kommandoradsgränssnitt (AWS CLI) till skapa ett schema manuellt.

Vi skapar schemat manuellt genom att köra följande CLI-kommando. Byta ut med ditt registernamn och se till att texten i beskrivningsfältet innehåller den obligatoriska strängen {AthenaFederationKafka}:

aws glue create-registry –registry-name <registry_name> --description {AthenaFederationKafka}

Därefter kör vi följande kommando för att skapa ett schema i det nyskapade schemaregistret:

aws glue create-schema –registry-id RegistryName=<registry_name> --schema-name <schema_name> --compatibility <Compatibility_Mode> --data-format JSON –schema-definition <Schema>

Innan du kör kommandot, se till att ange följande information:

  • ersätta med vårt AWS Glue Schema Registry-namn
  • ersätta med namnet på vårt Confluent Cloud-ämne, till exempel transaktioner
  • ersätta <Kompatibilitetsläge> med en av de som stöds kompatibilitetslägent.ex. "Bakåt"
  • ersätta med vårt schema

Konfigurera och distribuera Athena Connector

Med vårt schema skapat är vi redo att distribuera Athena-anslutningen. Slutför följande steg:

  1. Välj på Athena-konsolen Datakällor i navigeringsfönstret.
  2. Välja Skapa datakälla.
  3. Sök efter och välj Apache Kafka.
    Lägg till Apache Kafka som datakälla
  4. För Datakällans namnanger du namnet på datakällan.
    Ange namn för datakällan

Det här datakällans namn kommer att refereras till i dina frågor. Till exempel:

SELECT * FROM <data_source_name>.<registry_name>.<schema_name>
WHERE COL1='SOMETHING'

Om vi ​​tillämpar detta på vårt användningsfall och tidigare definierade schema, skulle vår fråga vara som följer:

SELECT * FROM "Confluent"."transactions_db"."transactions"
WHERE product_category='Kids'

  1. I Anslutningsdetaljer avsnitt väljer Skapa Lambda-funktion.
    skapa lambdafunktion

Du omdirigeras till Applikationer sida på lambdakonsolen. Vissa av applikationsinställningarna är redan ifyllda.

Följande är de viktiga inställningar som krävs för att integrera med Confluent Cloud. För mer information om dessa inställningar, se parametrar.

  1. För LambdaFunktionsnamn, ange namnet på den lambdafunktion som kontakten ska använda. Till exempel, athena_confluent_connector.

Vi använder denna parameter i nästa steg.

  1. För Kafka Endpoint, ange Confluent Cloud bootstrap URL.

Du hittar detta på Klusterinställningar sida i Confluent Cloud UI.

ange Confluent Cloud bootstrap URL

Confluent Cloud stöder två autentiseringsmekanismer: OAuth och SASL/PLAIN (API-nycklar). Anslutningen stöder inte OAuth; detta lämnar oss med SASL/PLAIN. SASL/PLAIN använder SSL som säkerhetsprotokoll och PLAIN som SASL-mekanism.

  1. För AuthType, stiga på SASL_SSL_PLAIN.

API-nyckeln och hemligheten som används av anslutaren för att komma åt Confluent måste lagras i AWS Secrets Manager.

  1. Skaffa din Confluent API-nyckel eller skapa en ny.
  2. Kör följande AWS CLI-kommando för att skapa hemligheten i Secrets Manager:
    aws secretsmanager create-secret --name <SecretNamePrefix> --secret-string "{"username":"<Confluent_API_KEY>","password":"<Confluent_Secret>"}"

Den hemliga strängen ska ha två nyckel-värdepar, varav ett namnges username och den andra password.

  1. För SecretNamePrefix, ange det hemliga namnprefixet som skapades i föregående steg.
  2. Om Confluent-molnklustret är tillgängligt över internet, lämna SecurityGroupIds och SubnetIds tom. Annars måste din Lambda-funktion köras i en VPC som har anslutning till ditt Confluent Cloud-nätverk. Ange därför ett säkerhetsgrupp-ID och tre privata subnät-ID:n i denna VPC.
  3. För SpillBucket, ange namnet på en S3-hink där kontakten kan spilla data.

Athena-kontakter lagrar tillfälligt (Spill) data till Amazon S3 för vidare bearbetning av Athena.

  1. Välja Jag är medveten om att den här appen skapar anpassade IAM-roller och resurspolicyer.
  2. Välja Distribuera.
  3. Återgå till Anslutningsdetaljer avsnitt om Athena-konsolen och för Lambda, ange namnet på Lambda-funktionen du skapade.
  4. Välja Nästa.
    Återgå till avsnittet Anslutningsdetaljer på Athena-konsolen och för Lambda anger du namnet på Lambdafunktionen du skapade. Och välj Nästa.
  5. Välja Skapa datakälla.

Utför interaktiv analys av konfluenta data

Med Athena-anslutningen konfigurerad är vår streamingdata nu sökbar från samma tjänst som vi använder för att analysera S3-datasjöar. Därefter använder vi Athena för att utföra punkt-i-tidsanalys av transaktioner som flödar genom Confluent Cloud.

aggregation

Vi kan använda standard SQL-funktioner för att aggregera data. Till exempel kan vi få intäkterna per produktkategori:

SELECT product_category, SUM(amount) AS Revenue
FROM "Confluent"."athena_blog"."transactions"
GROUP BY product_category
ORDER BY Revenue desc

SQL-funktion för att samla data

Berika transaktionsdata med kunddata

Aggregeringsexemplet är också tillgängligt med ksqlDB pull-frågor. Men Athenas anslutning tillåter oss att sammanfoga data med andra datakällor som Amazon S3.

I vårt användningsfall saknar transaktionerna som streamas till Confluent Cloud detaljerad information om kunder, förutom en customer_id. Däremot har vi en referensdatauppsättning i Amazon S3 som har mer information om kunderna. Med Athena kan vi sammanfoga båda dataseten för att få insikter om våra kunder. Se följande kod:

SELECT * FROM "Confluent"."athena_blog"."transactions" a
INNER JOIN "AwsDataCatalog"."athenablog"."customer" b ON a.customer_id=b.customer_id

ansluta data

Du kan se från resultaten att vi kunde berika streamingdata med kunddetaljer, lagrade i Amazon S3, inklusive namn och adress.

Visualisera data med QuickSight

En annan kraftfull funktion som denna anslutning ger är möjligheten att visualisera data lagrad i Confluent med hjälp av vilket BI-verktyg som helst som stöder Athena som datakälla. I det här inlägget använder vi QuickSight. QuickSight är en maskininlärning (ML)-driven BI-tjänst byggd för molnet. Du kan använda den för att leverera lättförståeliga insikter till personerna du arbetar med, var de än befinner sig.

För mer information om hur du registrerar dig för QuickSight, se Registrera dig för en Amazon QuickSight-prenumeration.

Slutför följande steg för att visualisera din strömmande data med QuickSight:

  1. Välj på QuickSight-konsolen dataset i navigeringsfönstret.
  2. Välja Nytt datasätt.
  3. Välj Athena som datakälla.
  4. För Datakällans namn, ange ett namn.
  5. Välja Skapa datakälla.
  6. I Välj ditt bord avsnitt väljer Använd anpassad SQL.
    I avsnittet Välj din tabell väljer du Använd anpassad SQL.
  7. Ange kopplingsfrågan som den som gavs tidigare och välj sedan Bekräfta frågan.
    Ange kopplingsfrågan som den som gavs tidigare och välj sedan Bekräfta fråga.
  8. Välj sedan att importera data till KRYDDA (Supersnabb, Parallell, In-Memory Calculation Engine), en fullständigt hanterad minnescache som ökar prestandan, eller fråga direkt efter data.

Användning av SPICE kommer att förbättra prestandan, men data kan behöva uppdateras med jämna mellanrum. Du kan välja att uppdatera din datauppsättning stegvis or schemalägg regelbundna uppdateringar med SPICE. Om du vill att data i nästan realtid ska återspeglas i dina instrumentpaneler väljer du Fråga din data direkt. Observera att med alternativet för direktfrågningar kan användaråtgärder i QuickSight, som att tillämpa ett detaljerat filter, anropa en ny Athena-fråga.

  1. Välja visualisera.
    Välj Visualisera

Det var allt, vi har framgångsrikt kopplat QuickSight till Confluent genom Athena. Med bara några klick kan du skapa några bilder som visar data från Confluent.

lyckades koppla QuickSight till Confluent genom Athena.

Städa upp

För att undvika att ådra sig pågående avgifter, radera de resurser du tillhandahållit genom att utföra följande steg:

  1. Ta bort AWS Glue-schemat och registret.
  2. Ta bort Athena Kafka-kontakten.
  3. Ta bort QuickSight-datauppsättningen.

Slutsats

I det här inlägget diskuterade vi användningsfall för Athena och Confluent. Vi gav exempel på hur du kan använda både för nästan realtidsdatavisualisering med QuickSight och interaktiv analys som involverar kopplingar mellan strömmande data i Confluent och data lagrad i Amazon S3.

Athena-kontakten för Kafka förenklar processen med att söka och analysera strömmande data från Confluent Cloud. Det tar bort behovet av att först flytta strömmande data till beständig lagring innan den kan användas i nedströms användningsfall som business intelligence. Detta kompletterar den befintliga integrationen mellan Confluent och Athena, med hjälp av S3 sink-kontakten, som möjliggör laddning av strömmande data i en datasjö, och är ett ytterligare alternativ för kunder som vill möjliggöra interaktiv analys av Confluent-data.


Om författarna

Ahmed Zamzam är Senior Partner Solutions Architect på Confluent, med fokus på AWS-partnerskapet. I sin roll arbetar han med kunder i EMEA-regionen i olika branscher för att hjälpa dem att bygga applikationer som utnyttjar deras data med Confluent och AWS. Före Confluent var Ahmed en Specialist Solutions Architect för Analytics AWS specialiserad på dataströmning och sökning. På fritiden tycker Ahmed om att resa, spela tennis och cykla.

Geetha Anne är en Partner Solutions Engineer på Confluent med tidigare erfarenhet av att implementera lösningar för datadrivna affärsproblem i molnet, som involverar datalagring och realtidsströmningsanalys. Hon blev förälskad i distribuerad datoranvändning under sin grundutbildning och har följt hennes intresse sedan dess. Geetha tillhandahåller teknisk vägledning, designrådgivning och tankeledarskap till viktiga Confluent-kunder och partners. Hon tycker också om att lära ut komplexa tekniska koncept för både teknikkunniga och allmänna publiker.

Tidsstämpel:

Mer från AWS Big Data