Hur SOCAR hanterar stora IoT-data med Amazon MSK och Amazon ElastiCache för Redis

Hur SOCAR hanterar stora IoT-data med Amazon MSK och Amazon ElastiCache för Redis

Källnod: 2086570

Det här är ett gästblogginlägg som skrivits tillsammans med SangSu Park och JaeHong Ahn från SOCAR. 

När företag fortsätter att utöka sitt digitala fotavtryck kan vikten av databearbetning och analys i realtid inte överskattas. Förmågan att snabbt mäta och dra insikter från data är avgörande i dagens affärslandskap, där snabbt beslutsfattande är nyckeln. Med denna förmåga kan företag ligga steget före och utveckla nya initiativ som driver framgång.

Detta inlägg är en fortsättning på Hur SOCAR byggde en strömmande datapipeline för att bearbeta IoT-data för realtidsanalys och kontroll. I det här inlägget ger vi en detaljerad översikt över strömmande meddelanden med Amazon Managed Streaming för Apache Kafka (Amazon MSK) och Amazon ElastiCache för Redis, som täcker tekniska aspekter och designöverväganden som är väsentliga för att uppnå optimala resultat.

SOCAR är det ledande koreanska mobilitetsföretaget med stark konkurrenskraft inom bildelning. SOCAR ville designa och bygga en lösning för ett nytt Fleet Management System (FMS). Detta system involverar insamling, bearbetning, lagring och analys av Internet of Things (IoT) strömmande data från olika fordonsenheter, såväl som historiska driftsdata som plats, hastighet, bränslenivå och komponentstatus.

Det här inlägget visar en lösning för SOCARs produktionsapplikation som låter dem ladda strömmande data från Amazon MSK till ElastiCache för Redis, vilket optimerar hastigheten och effektiviteten i deras databehandlingspipeline. Vi diskuterar också lösningens nyckelfunktioner, överväganden och design.

Bakgrund

SOCAR driver cirka 20,000 XNUMX bilar och planerar att inkludera andra stora fordonstyper som kommersiella fordon och budbilar. SOCAR har distribuerat enheter i bilen som fångar data med hjälp av AWS IoT Core. Dessa data lagrades sedan i Amazon Relational Databas Service (Amazon RDS). Utmaningen med detta tillvägagångssätt inkluderade ineffektiv prestanda och hög resursanvändning. Därför letade SOCAR efter specialbyggda databaser skräddarsydda för deras applikations- och användningsmönster samtidigt som de mötte de framtida kraven för SOCARs affärsmässiga och tekniska krav. Nyckelkraven för SOCAR inkluderade att uppnå maximal prestanda för dataanalys i realtid, vilket krävde lagring av data i ett datalager i minnet.

Efter noggrant övervägande valdes ElastiCache för Redis som den optimala lösningen på grund av dess förmåga att hantera komplexa regler för dataaggregering med lätthet. En av utmaningarna var att ladda data från Amazon MSK till databasen, eftersom det inte fanns någon inbyggd Kafka-kontakt och konsument tillgänglig för denna uppgift. Det här inlägget fokuserar på utvecklingen av en Kafka-konsumentapplikation som designades för att tackla denna utmaning genom att möjliggöra prestandaladdade data från Amazon MSK till Redis.

Lösningsöversikt

Att extrahera värdefulla insikter från strömmande data kan vara en utmaning för företag med olika användningsfall och arbetsbelastningar. Det är därför SOCAR byggde en lösning för att sömlöst överföra data från Amazon MSK till flera specialbyggda databaser, samtidigt som det ger användarna möjlighet att omvandla data efter behov. Med fullt hanterad Apache Kafka tillhandahåller Amazon MSK en pålitlig och effektiv plattform för att inta och bearbeta realtidsdata.

Följande figur visar ett exempel på dataflödet vid SOCAR.

lösningsöversikt

Denna arkitektur består av tre komponenter:

  • Strömmande data – Amazon MSK fungerar som en skalbar och pålitlig plattform för strömmande data, som kan ta emot och lagra meddelanden från en mängd olika källor, inklusive AWS IoT Core, med meddelanden organiserade i flera ämnen och partitioner
  • Konsumentapplikation – Med en konsumentapplikation kan användare sömlöst överföra data från Amazon MSK till en måldatabas eller datalagring samtidigt som de definierar regler för datatransformation efter behov
  • Måldatabaser – Med konsumentapplikationen kunde SOCAR-teamet ladda data från Amazon MSK till två separata databaser, som var och en betjänar en specifik arbetsbelastning

Även om det här inlägget fokuserar på ett specifikt användningsfall med ElastiCache för Redis som måldatabas och ett enda ämne som kallas gps, kan konsumentapplikationen vi beskriver hantera ytterligare ämnen och meddelanden, såväl som olika streamingkällor och måldatabaser som t.ex. Amazon DynamoDB. Vårt inlägg täcker de viktigaste aspekterna av konsumentapplikationen, inklusive dess funktioner och komponenter, designöverväganden och en detaljerad guide till kodimplementeringen.

Komponenter i konsumentapplikationen

Konsumentapplikationen består av tre huvuddelar som arbetar tillsammans för att konsumera, transformera och ladda meddelanden från Amazon MSK till en måldatabas. Följande diagram visar ett exempel på datatransformationer i hanterarkomponenten.

konsument-applikation

Detaljerna för varje komponent är som följer:

  • konsumenten – Detta förbrukar meddelanden från Amazon MSK och vidarebefordrar sedan meddelandena till en nedströmshanterare.
  • Loader – Det är här användarna anger en måldatabas. Till exempel inkluderar SOCARs måldatabaser ElastiCache för Redis och DynamoDB.
  • Handler – Det är här användare kan tillämpa regler för datatransformation på de inkommande meddelandena innan de laddas in i en måldatabas.

Funktioner i konsumentapplikationen

Denna anslutning har tre funktioner:

  • skalbarhet – Den här lösningen är designad för att vara skalbar, vilket säkerställer att konsumentapplikationen kan hantera en ökande mängd data och rymma ytterligare applikationer i framtiden. Till exempel sökte SOCAR att utveckla en lösning som kan hantera inte bara aktuell data från cirka 20,000 XNUMX fordon utan också en större volym av meddelanden eftersom verksamheten och data fortsätter att växa snabbt.
  • prestanda – Med denna konsumentapplikation kan användare uppnå konsekvent prestanda, även när mängden källmeddelanden och måldatabaser ökar. Applikationen stöder multithreading, vilket möjliggör samtidig databehandling och kan hantera oväntade toppar i datavolymen genom att enkelt öka beräkningsresurserna.
  • Flexibilitet – Denna konsumentapplikation kan återanvändas för alla nya ämnen utan att behöva bygga hela konsumentapplikationen igen. Konsumentapplikationen kan användas för att mata in nya meddelanden med olika konfigurationsvärden i hanteraren. SOCAR distribuerade flera hanterare för att mata in många olika meddelanden. Dessutom tillåter denna konsumentapplikation användare att lägga till ytterligare målplatser. Till exempel utvecklade SOCAR initialt en lösning för ElastiCache för Redis och replikerade sedan konsumentapplikationen för DynamoDB.

Designöverväganden för konsumentapplikationen

Observera följande designöverväganden för konsumentapplikationen:

  • Skala ut – En viktig designprincip för denna lösning är skalbarhet. För att uppnå detta körs konsumentapplikationen med Amazon Elastic Kubernetes-tjänst (Amazon EKS) eftersom det kan tillåta användare att öka och replikera konsumentapplikationer enkelt.
  • Konsumtionsmönster – För att ta emot, lagra och konsumera data effektivt är det viktigt att utforma Kafka-ämnen beroende på budskap och konsumtionsmönster. Beroende på meddelanden som konsumeras i slutet, kan meddelanden tas emot i flera ämnen i olika scheman. Till exempel har SOCAR många olika ämnen som konsumeras av olika arbetsbelastningar.
  • Specialbyggd databas – Konsumentapplikationen stöder laddning av data till flera målalternativ baserat på det specifika användningsfallet. Till exempel lagrade SOCAR IoT-data i realtid i ElastiCache för Redis för att driva instrumentpanelen och webbapplikationer i realtid, samtidigt som den senaste reseinformationen lagrades i DynamoDB som inte krävde realtidsbearbetning.

Walkthrough-översikt

Tillverkaren av denna lösning är AWS IoT Core, som skickar ut meddelanden till ett ämne som kallas gps. Måldatabasen för denna lösning är ElastiCache för Redis. ElastiCache for Red är en snabb minneslagring som tillhandahåller fördröjning på under millisekunder för att driva realtidsapplikationer i internetskala. Byggd på Redis med öppen källkod och kompatibel med Redis API:er, kombinerar ElastiCache for Redis hastigheten, enkelheten och mångsidigheten hos Redis med öppen källkod med hanterbarheten, säkerheten och skalbarheten från Amazon för att driva de mest krävande realtidsapplikationerna.

Målplatsen kan vara antingen en annan databas eller lagring beroende på användningsfall och arbetsbelastning. SOCAR använder Amazon EKS för att driva den containeriserade lösningen för att uppnå skalbarhet, prestanda och flexibilitet. Amazon EKS är en hanterad Kubernetes-tjänst för att köra Kubernetes i AWS-molnet. Amazon EKS hanterar automatiskt tillgängligheten och skalbarheten för Kubernetes kontrollplansnoder som ansvarar för schemaläggning av containrar, hantering av applikationstillgänglighet, lagring av klusterdata och andra nyckeluppgifter.

För programmeringsspråket bestämde sig SOCAR-teamet för att använda programmeringsspråket Go och använda både AWS SDK för Go och en Goroutine, en lättviktig logisk eller virtuell tråd som hanteras av Go runtime, vilket gör det enkelt att hantera flera trådar. AWS SDK för Go förenklar användningen av AWS-tjänster genom att tillhandahålla en uppsättning bibliotek som är konsekventa och bekanta för Go-utvecklare.

I följande avsnitt går vi igenom stegen för att implementera lösningen:

  1. Skapa en konsument.
  2. Skapa en lastare.
  3. Skapa en hanterare.
  4. Bygg en konsumentapplikation med konsumenten, lastaren och hanteraren.
  5. Implementera konsumentapplikationen.

Förutsättningar

För denna genomgång bör du ha följande:

Skapa en konsument

I det här exemplet använder vi ett ämne som heter gps, och konsumenten inkluderar en Kafka-klient som tar emot meddelanden från ämnet. SOCAR skapade en struktur och byggde en konsument (kallad NewConsumer i koden) för att göra den förlängningsbar. Med detta tillvägagångssätt kan alla ytterligare parametrar och regler enkelt läggas till.

För att autentisera med Amazon MSK använder SOCAR IAM. Eftersom SOCAR redan använder IAM för att autentisera andra resurser, som Amazon EKS, använder den samma IAM-roll (aws_msk_iam_v2) för att autentisera klienter för både Amazon MSK- och Apache Kafka-åtgärder.

Följande kod skapar konsumenten:

type Consumer struct { logger *zerolog.Logger kafkaReader *kafka.Reader
} func NewConsumer(logger *zerolog.Logger, awsCfg aws.Config, brokers []string, consumerGroupID, topic string) *Consumer { return &Consumer{ logger: logger, kafkaReader: kafka.NewReader(kafka.ReaderConfig{ Dialer: &kafka.Dialer{ TLS: &tls.Config{MinVersion: tls.VersionTLS12}, Timeout: 10 * time.Second, DualStack: true, SASLMechanism: aws_msk_iam_v2.NewMechanism(awsCfg), }, Brokers: brokers, // GroupID: consumerGroupID, // Topic: topic, // StartOffset: kafka.LastOffset, // }), }
} func (consumer *Consumer) Close() error { var err error = nil if consumer.kafkaReader != nil { err = consumer.kafkaReader.Close() consumer.logger.Info().Msg("closed kafka reader") } return err
} func (consumer *Consumer) Consume(ctx context.Context) (kafka.message, error) { return consumer.kafkaReader.Readmessage(ctx)
}

Skapa en lastare

Lastarfunktionen, representerad av Loader struct, är ansvarig för att ladda meddelanden till målplatsen, som i det här fallet är ElastiCache för Redis. De NewLoader funktionen initierar en ny instans av Loader struct med en logger och en Redis-klusterklient, som används för att kommunicera med ElastiCache-klustret. De redis.NewClusterClient objekt initieras med hjälp av NewRedisClient funktion, som använder IAM för att autentisera klienten för Redis-åtgärder. Detta säkerställer säker och auktoriserad åtkomst till ElastiCache-klustret. Loader-strukturen innehåller också Close-metoden för att stänga Kafka-läsaren och frigöra resurser.

Följande kod skapar en loader:

type Loader struct { logger *zerolog.Logger redisClient *redis.ClusterClient
} func NewLoader(logger *zerolog.Logger, redisClient *redis.ClusterClient) *Loader { return &Loader{ logger: logger, redisClient: redisClient, }
} func (consumer *Consumer) Close() error { var err error = nil if consumer.kafkaReader != nil { err = consumer.kafkaReader.Close() consumer.logger.Info().Msg("closed kafka reader") } return err
} func (consumer *Consumer) Consume(ctx context.Context) (kafka.Message, error) { return consumer.kafkaReader.ReadMessage(ctx)
} func NewRedisClient(ctx context.Context, awsCfg aws.Config, addrs []string, replicationGroupID, username string) (*redis.ClusterClient, error) { redisClient := redis.NewClusterClient(&redis.ClusterOptions{ NewClient: func(opt *redis.Options) *redis.Client { return redis.NewClient(&redis.Options{ Addr: opt.Addr, CredentialsProvider: func() (username string, password string) { token, err := BuildRedisIAMAuthToken(ctx, awsCfg, replicationGroupID, opt.Username) if err != nil { panic(err) } return opt.Username, token }, PoolSize: opt.PoolSize, PoolTimeout: opt.PoolTimeout, TLSConfig: &tls.Config{InsecureSkipVerify: true}, }) }, Addrs: addrs, Username: username, PoolSize: 100, PoolTimeout: 1 * time.Minute, }) pong, err := redisClient.Ping(ctx).Result() if err != nil { return nil, err } if pong != "PONG" { return nil, fmt.Errorf("failed to verify connection to redis server") } return redisClient, nil
}

Skapa en hanterare

En hanterare används för att inkludera affärsregler och datatransformationslogik som förbereder data innan den laddas in på målplatsen. Den fungerar som en brygga mellan en konsument och en lastare. I det här exemplet är ämnesnamnet cars.gps.json, och meddelandet innehåller två nycklar, lng och lat, med datatyp Float64. Affärslogiken kan definieras i en funktion som handlerFuncGpsToRedis och tillämpade sedan enligt följande:

type ( handlerFunc func(ctx context.Context, loader *Loader, key, value []byte) error handlerFuncMap map[string]handlerFunc
) var HandlerRedis = handlerFuncMap{ "cars.gps.json": handlerFuncGpsToRedis
} func GetHandlerFunc(funcMap handlerFuncMap, topic string) (handlerFunc, error) { handlerFunc, exist := funcMap[topic] if !exist { return nil, fmt.Errorf("failed to find handler func for '%s'", topic) } return handlerFunc, nil
} func handlerFuncGpsToRedis(ctx context.Context, loader *Loader, key, value []byte) error { // unmarshal raw data to map data := map[string]interface{}{} err := json.Unmarshal(value, &data) if err != nil { return err } // prepare things to load on redis as geolocation name := string(key) lng, err := getFloat64ValueFromMap(data, "lng") if err != nil { return err } lat, err := getFloat64ValueFromMap(data, "lat") if err != nil { return err } // add geolocation to redis return loader.RedisGeoAdd(ctx, "cars#gps", name, lng, lat)
}

Bygg en konsumentapplikation med konsumenten, lastaren och hanteraren

Nu har du skapat konsumenten, lastaren och hanteraren. Nästa steg är att bygga en konsumentapplikation med hjälp av dem. I en konsumentapplikation läser du meddelanden från din stream med en konsument, omvandlar dem med en hanterare och laddar sedan omvandlade meddelanden till en målplats med en lastare. Dessa tre komponenter parametreras i en konsumentapplikationsfunktion som den som visas i följande kod:

type Connector struct { ctx context.Context logger *zerolog.Logger consumer *Consumer handler handlerFuncMap loader *Loader
} func NewConnector(ctx context.Context, logger *zerolog.Logger, consumer *Consumer, handler handlerFuncMap, loader *Loader) *Connector { return &Connector{ ctx: ctx, logger: logger, consumer: consumer, handler: handler, loader: loader, }
} func (connector *Connector) Close() error { var err error = nil if connector.consumer != nil { err = connector.consumer.Close() } if connector.loader != nil { err = connector.loader.Close() } return err
} func (connector *Connector) Run() error { wg := sync.WaitGroup{} defer wg.Wait() handlerFunc, err := GetHandlerFunc(connector.handler, connector.consumer.kafkaReader.Config().Topic) if err != nil { return err } for { msg, err := connector.consumer.Consume(connector.ctx) if err != nil { if errors.Is(context.Canceled, err) { break } } wg.Add(1) go func(key, value []byte) { defer wg.Done() err = handlerFunc(connector.ctx, connector.loader, key, value) if err != nil { connector.logger.Err(err).Msg("") } }(msg.Key, msg.Value) } return nil
}

Implementera konsumentapplikationen

För att uppnå maximal parallellitet, containeriserar SOCAR konsumentapplikationen och distribuerar den i flera pods på Amazon EKS. Varje konsumentapplikation innehåller en unik konsument, lastare och hanterare. Om du till exempel behöver ta emot meddelanden från ett enda ämne med fem partitioner, kan du distribuera fem identiska konsumentapplikationer, som var och en körs i sin egen pod. På samma sätt, om du har två ämnen med tre partitioner var, bör du distribuera två konsumentapplikationer, vilket resulterar i totalt sex poddar. Det är en bästa praxis att köra en konsumentapplikation per ämne, och antalet poddar bör matcha antalet partitioner för att möjliggöra samtidig meddelandebehandling. Podnumret kan anges i Kubernetes-distributionskonfigurationen

Det finns två steg i Dockerfilen. Det första steget är byggare, som installerar byggverktyg och beroenden, och bygger applikationen. Det andra steget är runner, som använder en mindre basbild (Alpine) och kopierar endast nödvändiga filer från byggarstadiet. Den ställer också in lämpliga användarbehörigheter och kör applikationen. Det är också värt att notera att byggarscenen använder en specifik version av Golang-bilden, medan löparscenen använder en specifik version av den alpina bilden, som båda anses vara lätta och säkra bilder.

Följande kod är ett exempel på Dockerfilen:

# builder
FROM golang:1.18.2-alpine3.16 AS builder
RUN apk add build-base
WORKDIR /usr/src/app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o connector . # runner
FROM alpine:3.16.0 AS runner
WORKDIR /usr/bin/app
RUN apk add --no-cache tzdata
RUN addgroup --system app && adduser --system --shell /bin/false --ingroup app app
COPY --from=builder /usr/src/app/connector .
RUN chown -R app:app /usr/bin/app
USER app
ENTRYPOINT ["/usr/bin/app/connector"]

Slutsats

I det här inlägget diskuterade vi SOCARs tillvägagångssätt för att bygga en konsumentapplikation som möjliggör IoT-strömning i realtid från Amazon MSK för att rikta in sig på platser som ElastiCache för Redis. Vi hoppas att du tyckte att det här inlägget var informativt och användbart. Tack för att du läste!


Om författarna

SangSu Park är chef för operationsgruppen på SOCAR. Hans passion är att fortsätta lära sig, anamma utmaningar och sträva efter ömsesidig tillväxt genom kommunikation. Han älskar att resa på jakt efter nya städer och platser.

jaehongJaeHong Ahn är en DevOps-ingenjör i SOCARs molninfrastrukturteam. Han är dedikerad till att främja samarbete mellan utvecklare och operatörer. Han tycker om att skapa DevOps-verktyg och är engagerad i att använda sina kodningsförmåga för att hjälpa till att bygga en bättre värld. Han älskar att laga läckra måltider som privat kock åt sin fru.

bdb-2857-youngguYounggu Yun arbetar på AWS Data Lab i Korea. Hans roll innebär att hjälpa kunder över hela APAC-regionen att nå sina affärsmål och övervinna tekniska utmaningar genom att tillhandahålla föreskrivande arkitektonisk vägledning, dela bästa praxis och bygga innovativa lösningar tillsammans.

Tidsstämpel:

Mer från AWS Big Data