[C][LabVIEW] Własne DLL vs LabVIEW oraz zabawy ptaszkiem czyli integracja LV z brokerem Kafka

Tutaj umieszczamy tematy związane z językami programowania niepasującymi do innych działów.
Regulamin forum
Temat prosimy poprzedzić nazwą języka umieszczonego w nawiasach kwadratowych np. [Pascal].
Awatar użytkownika
tasza
Geek
Geek
Posty: 1082
Rejestracja: czwartek 12 sty 2017, 10:24
Kontaktowanie:

[C][LabVIEW] Własne DLL vs LabVIEW oraz zabawy ptaszkiem czyli integracja LV z brokerem Kafka

Postautor: tasza » środa 17 lip 2019, 22:02

☘ ♬ ♬ ♬ Moja muzyka do kodowania ♬ ♬ ♬ ☘
♫ ♩ ♪ Batushka ♩ ☘ ♪ Litourgiya ♪ ♩ ♫
https://youtu.be/xgfa5UlZAL8


No wiem, grunt to zapodać dobry tytuł ... no ale inaczej to nikt by tutaj nie zajrzał, a temat jest uważam ciekawy.

Post ten to swego rodzaju kontynuacja zagadnień stosu ELK i możliwości jego zasilania danymi z poziomu aplikacji przygotowanych w LabVIEW. Poprzednio pokazałam, jak pisać w LV bezpośrednio do Elasticsearch, całość niby działała, ale wydajność na poziomie ~100 komunikatów na sekundę, to ... no cóż, bywało lepiej. Tu zmienimy podejście i dostawimy nowy komponent - system dystrybucji komunikatów (broker) Kafka. Oczywiście lekko nie będzie, ponieważ integracja LV i Kafki wymaga pewnych ceregieli, ale całość jest do ogarnięcia. A więc najpierw...

Tematem Kafki jako takiej zajmiemy się za chwilę, póki co trzeba mi odczarować jeden drobny aspekt kodowania w LabVIEW, a mianowicie zagadnienie dołączania zewnętrznych (własnych lub obcych) bibliotek DLL do naszej aplikacji. Nie widzę potrzeby strzępić dzioba, przy tak dobrych opracowaniach dostępnych online, zatem polecam:

:arrow: https://knowledge.ni.com/KnowledgeArticleDetails?id=kA00Z0000019Ls1SAE&l=pl-PL
:arrow: An Overview of Accessing DLLs or Shared Libraries from LabVIEW
:arrow: How to Call Win32Dynamic Link Libraries (DLLs)from LabVIEW
:arrow: Writing Win32 Dynamic LinkLibraries (DLLs) and Calling Themfrom LabVIEW

Odnośnie narzędzia do kodowania pod Windows - większość opracowań dotyka różnych wersji Microsoft Visual C, ja nieco na przekór zaproponuję przygotowanie biblioteki w pakiecie DEV-C++. Narzędzie to ma pewnie i swoje wady, ale jest w tym akurat przypadku bardzo dobrą alternatywą dla MSVC, no i jest za gratis.

Cały eksperyment sprowadza się do utworzenia projektu biblioteki DLL dla platformy Windows 32 bit, dostaniemy wstępnie zakodowany szkielet funkcji głównej DllMain(). Cała nasza praca to wypełnienie tego treścią (docelowo - połączeniem z Kafką), póki co poprzestańmy na klasycznym demku - wywołaniu systemowej funkcji MessageBox().

W pliku nagłówkowym deklarujemy funkcję sayHello() opatrując wymaganymi do eksportu dyrektywami:

kafkalv.h pisze:

Kod: Zaznacz cały

#define DLL_EXPORT __declspec(dllexport)
extern "C" {
   void DLL_EXPORT sayHello( char *, char * );         
}


W pliku kafkalv.cpp definiujemy ciało funkcji, jak widać - trywialne:

kafkalv.cpp pisze:

Kod: Zaznacz cały

void DLL_EXPORT sayHello( char *pText, char *pCaption ){
   log( "calling with: %s, %s", pText, pCaption );
   MessageBox( 0, pText, pCaption, MB_ICONINFORMATION );
   log( "done", NULL);
}


Fajne jest to, że DEV-C++ samodzielnie zadba o przygotowanie pliku z listą funkcji do eksportu w nasze DLL-ce, taki oto pliczek powstaje przy okazji i proszę w nim nie grzebać ręcznie, raczej poprawiać deklarację funkcji, gdy LabVIEW będzie miało problemy z widocznością czy do nich dostępem:

libkafkalv.def pisze:

Kod: Zaznacz cały

EXPORTS
    ...
    sayHello @5


Teraz jest też dobra okazja opowiedzieć o pokrace programistycznej czyli widocznej w wywołaniach powyżej funkcji log(). Niespecjalnie jestem dumna z tego...czegoś, ale poza kilkoma wadami (tu szczególnie: koszt wykonania) ona ma tę zaletę, że skutecznie działa, zrzuca dane na dysk natychmiast i nie blokuje dostępu do generowanego pliku logu. Aby nie bawić się w kotka i myszkę z funkcją logującą, przyjęłam że plik logu będzie zrzucany w tym samym katalogu, w którym znajduje się plik *.dll. Stąd obecność funkcji GetModuleFileName() i wywołanie na jej potrzeby GetDllHandle(). Całość zwraca ścieżkę do aktualnie załadowanej instancji biblioteki DLL, nawet jeżeli proces ją wołający powoływany jest z zupełnie innego foldera. Tu dokumentacja funkcji systemowej :arrow: VirtualQuery, a funkcja GetDllHandle() poniżej:

kafkalv.cpp pisze:

Kod: Zaznacz cały

HMODULE GetDllHandle() {
    static int dummy = 0;
    MEMORY_BASIC_INFORMATION mbi;
    if( !VirtualQuery( &dummy, &mbi, sizeof( mbi ) ) ) {
        return NULL;
    }
    return (HMODULE)mbi.AllocationBase;
}


Sama funkcja logująca, akceptująca zmienną listę parametrów wygląda tak:

kafkalv.cpp pisze:

Kod: Zaznacz cały

void log( const char *fmt, ... ) {
   FILE *hLog = NULL;   
   char szFullPath[0xFF];
   char szDir[0xFF];
   char szLogFileName [0xFF];
    va_list argptr;   
    SYSTEMTIME systime;

    GetLocalTime( &systime );
   
       GetModuleFileName ( GetDllHandle(), szFullPath, 0xFF );
      char *pLastSlash = strrchr( szFullPath, '\\' );
      *pLastSlash = 0x00;   
      sprintf(
         szLogFileName,
         "\\kafkalv-%04d%02d%02d.txt",
         systime.wYear,
         systime.wMonth,
         systime.wDay
         );
      strcat ( szFullPath, szLogFileName );
       hLog = fopen( szFullPath, "a" );
       if (!hLog) {
          return;
      }
      
    fprintf(
       hLog,   
        "[%04d-%02d-%02dT%02d:%02d:%02d.%03d][%s][%d] ",
      systime.wYear,
      systime.wMonth,
      systime.wDay,
      systime.wHour,
      systime.wMinute,
      systime.wSecond,
      systime.wMilliseconds,
        __FILE__,
        __LINE__
    );
    va_start( argptr, fmt) ;   
    vfprintf ( hLog, fmt, argptr );   
    va_end(argptr);
    fprintf( hLog, "\n" );
   fclose( hLog );   
}


Przejdźmy teraz do wnętrzności funkcji DllMain(), prostych ale ze względu na wywołania funkcji log() pozwalających zaobserwować co LabVIEW wyczynia z podłożoną mu zewnętrzną biblioteką DLL:

kafkalv.cpp pisze:

Kod: Zaznacz cały

BOOL WINAPI DllMain(HINSTANCE hinstDLL,DWORD fdwReason,LPVOID lpvReserved) {
   switch( fdwReason ) {
      case DLL_PROCESS_ATTACH: {
         log ( "hInst:%08X, DLL_PROCESS_ATTACH ------------------------------ ", hinstDLL );
         break;
      }
      case DLL_PROCESS_DETACH: {
         log ( "hInst:%08X, DLL_PROCESS_DETACH  ------------------------------ ", hinstDLL );         
         break;
      }
      case DLL_THREAD_ATTACH: {
         log ( "hInst:%08X, DLL_THREAD_ATTACH", hinstDLL );         
         break;
      }
      case DLL_THREAD_DETACH:{
         log ( "hInst:%08X, DLL_THREAD_DETACH", hinstDLL );                  
         break;
      }
   }
   return TRUE;
}


Kreseczki przy process attach/detach to początek i koniec segmentu logu, zaraz zobaczymy w praktyce jak to działa. Malunki poniżej to minimalistyczna aplikacja w LabVIEW wywołująca wspomnianą funkcję sayHello() z naszej biblioteki, widzimy efekt działania jak i okna konfiguracyjne kostki :arrow: Call Library Function Node:

00_simple_dll_call.gif


Budowanie takiej aplikacji w LV, jak również zupełnie prostą koderkę w DEV-C++ widzimy na żywo na filmiku:
https://youtu.be/q1FUSa6IrRo

Fragment logu aplikacji, który powstał w ramach demka:

kafkalv-20190717.txt pisze:

Kod: Zaznacz cały

[2019-07-17T16:45:40.318][kafkalv.cpp][60] hInst:62500000, DLL_PROCESS_ATTACH ------------------------------
[2019-07-17T16:46:06.236][kafkalv.cpp][60] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-17T16:46:27.614][kafkalv.cpp][60] calling with: to jest tekst w okienku, to jest tytuł okienka
[2019-07-17T16:46:33.889][kafkalv.cpp][60] done
[2019-07-17T16:46:35.857][kafkalv.cpp][60] calling with: to jest tekst w okienku, to jest tytuł okienka
[2019-07-17T16:46:37.484][kafkalv.cpp][60] done
[2019-07-17T16:46:47.513][kafkalv.cpp][60] hInst:62500000, DLL_THREAD_DETACH
[2019-07-17T16:46:51.903][kafkalv.cpp][60] hInst:62500000, DLL_PROCESS_DETACH  ------------------------------



I jak to czytamy, a mianowicie:
- już podczas konfigurowania kostki `Call Library Function Node` biblioteka DLL jest ładowana do pamięci i wywoływana jest DllMain() z fdwReason=DLL_PROCESS_ATTACH
- DLL_THREAD_ATTACH to uruchomienie funkcji składowych biblioteki, LV obsługuje to w wątku wskazanym konfiguracją - u mnie wątek interfejsu użytkownika (UI)
- DLL_THREAD_DETACH to zwolnienie biblioteki w kontekście bieżącego wątku
- finalnie DLL_PROCESS_DETACH - odładowanie biblioteki, następuje przy zamknięciu okna *.VI bieżącej aplikacji korzystającej z naszej biblioteki. Jeżeli miałabym odpalone to cudo w dwóch identycznych okienkach LV - detach zostałby zawołany z chwilą zamknięcia ostatniego z nich, to oczywiste.

W ramach napisów końcowych - garstka wniosków.

Po pierwsze - tworzenie własnych, nawet najprostszych DLL do wykorzystania w LabVIEW jest dla ludzi i nie trzeba ku temu wypaśnych komercyjnych narzędzi, z powodzeniem wystarczą darmowe. Po drugie warto chwilę czasu poświęcić na zaznajomienie się z cyklem życia takiej DLL w kontekście życia aplikacji w LV, dowiemy się wtenczas jakie zależności czasowe wiążą odpowiednie wywołania do wnętrza biblioteki, to ważne ze względu na alokację i zwalnianie zasobów, choćby tak podstawowych jak zamykanie uchwytów otwartych przez DLL plików. Po trzecie, tego w sumie nie pokazałam, ale należy mieć świadomość, że kod funkcji z DLL wykonuje się w kontekście przestrzeni adresowej procesu aplikacji LabVIEW, a nie żadnym tam odizolowanym 'sandbox' czy innej piaskownicy. Czyli jeżeli damy ciała w naszej zewnętrznie ładowanej funkcji to mamy prawie pewność, że główna aplikacja LV także ulegnie destabilizacji, szczególnie spektakularne są tu strzały typu '0xC0000005: Access violation reading location' które objawiają się...przy zamykaniu dłużej działającej aplikacji LabVIEW. Po prostu trzeba uważać.

No i słowo przejściowe - taki jakby mostek do drugiej części.

Często bywa tak, że w LabVIEW chcemy wykorzystać API innego produktu, czy to bazy danych czy może jakiegoś dziwacznego, ale cennego funkcjonalnie sterownika do sprzętu, bywa różnie. I często zdarza się tak, że owe API jest tak masakrystycznie pokręcone, struktury danych, którymi operuje są wręcz perwersyjnie zbudowane, normalnie - przesiadka z nich na LV to tylko siąść i ryczeć.
I wtenczas co robimy?
Ano przykrywamy te narzucone wywołania własną warstwą autorskiego oprogramowania (wrapperem), opakowując całą tę upierdliwość zewnętrznie narzuconego interfejsu własnymi, prostymi wywołaniami. Nad którymi w miarę panujemy i co ważne - możemy je modyfikować, zależnie od bieżących potrzeb. I o tym będzie dalej.

#slowanawiatr
Nie masz wymaganych uprawnień, aby zobaczyć pliki załączone do tego posta.
______________________________________________ ____ ___ __ _ _ _ _
Kończysz tworzyć dopiero, gdy umierasz. (Marina Abramović)

Awatar użytkownika
tasza
Geek
Geek
Posty: 1082
Rejestracja: czwartek 12 sty 2017, 10:24
Kontaktowanie:

Re: [C][LabVIEW] Własne DLL vs LabVIEW oraz zabawy ptaszkiem czyli integracja LV z brokerem Kafka

Postautor: tasza » piątek 19 lip 2019, 13:16

♬ ♬ ♬ Moja muzyka do kodowania ♬ ♬ ♬☘
♫ ♩ ♪ Batushka ♫ ♩ ♪ Панихида / Requiem ♪ ♩ ♫
https://youtu.be/XTwdZVaq4vQ


Po drobnym wprowadzeniu w temat podkładania własnych DLL aplikacjom w LabVIEW czas na tytułowego ptaszka czyli broker Kafka.

skrzydlaty posłaniec

O ptaszysku w zaiste telegraficznym skrócie - Apache Kafka to system przesyłania i dystrybucji komunikatów, pracuje w oparciu o model publish-subscribe. Aplikacje chcące udostępnić swoje wiadomości (np. informacje o zdarzeniach) publikują je w obrębie struktury danych zwanej tematem (ang. topic), czasami spolszcza się do `topik`. Życiowa analogia Kafki to tematyczna tablica ogłoszeń. Przychodzą ludziki i nalepiają swoje ogłoszenia, każdy w odpowiednim dla treści sektorze tablicy (topiku) - to publikacja. Inni przychodzą i czytają te ogłoszenia, zapamiętują gdzie ostatnio skończyli, jak wrócą - zerkają od tego miejsca, poszukując nowych kartek - to subskrypcja. Oczywiście tablica ma skończoną pojemność więc jakoś trzeba tym stadem karteczek-komunikatów zarządzać. Tu z pomocą przychodzą (pojadę na fachowo) polityki retencji - można ustalić, że klej na karteczkach będzie trzymał nie dłużej niż zdefiniowany arbitralnie czas (dla całego brokrea lub wybranego topiku) i po zadanym czasie kartka spadnie, komunikat zniknie. Można też zdefiniować, że gdy tablica zacznie się zapełniać, najstarsze kartki-wiadomości, zostaną zmiecione, aby zwolnić miejsce dla nowych. Format i treść komunikatów jest dla brokera dowolna (XML,JSON,tekst,cokolwiek) - ważne, aby klienci umieli się w tejże odnaleźć i skorzystać. No i jak zauważamy - subskrybentów może być wielu, każdy sobie interesujący go topik przeczyta i zrobi z informacją to co potrzeba. Dalej tradycyjnie, wmiast rybki na talerzu będzie podpowiedź gdzie jest najbliższy staw, zachęcam do poczytania w wolnej chwili:

:arrow: Podstawy działania Kafki
:arrow: Apache Kafka
:arrow: Apache Kafka Tutorial — Kafka For Beginners
:arrow: Apache Kafka - Quickstart
:arrow: Setup and run Apache KAFKA on Windows
:arrow: Apache Kafka Idempotent Producer - Avoiding message duplication
:arrow: Kafka Clients (At-Most-Once, At-Least-Once, Exactly-Once, and Avro Client)

Odnośnie części klienckiej brokera Kafka - w kontekście interesującego nas języka C/C++ wyboru wielkiego nie mamy. Wprawdzie niskopoziomowy protokół jest bardzo dobrze opisany :arrow: Kafka protocol guide, ale samodzielne przygotowanie klienta to jednak nieco karkołomna sprawa. Z pomocą przychodzi całkiem sensowna biblioteka :arrow: librdkafka w połączeniu z przykładami i opisem API :arrow: librdkafka documentation pozwala całkiem sprawnie dobrać się do brokera. Przykładem użyteczności wspomnianej biblioteki jest bardzo przydatne narządko :arrow: kafkacat, polecam skompilować sobie ten programik, jest naprawdę fajny.

Wspomnę jeszcze o ważnym narzędziu do pracy z Kafką - Kafka Tool, oprogramowanie to jest za darmo do domowego, osobistego wykorzystania i z dostępnych w sieci GUI działa najsensowniej, choć to oczywiście moja subiektywna opinia.

Rysunek poniżej to taka nieuczesana wizja tego, co zaraz tu zmajstrujemy.

kafka-archi.png


Część Elastic+Kibana to już zostawię w spokoju, ot potrzeba finalnego magazynu na komunikaty i interfejsu do ich oglądania. Zajmiemy się najpierw podłączeniem do Kafki z poziomu LabVIEW

publikacja wiadomości

Zerknijmy jak z poziomu języka C wysyła się komunikaty tekstowe do brokera, na wskazany topik, proszę:
:arrow: rdkafka_simple_producer.c.
Są jak widać następujące rzeczy od wykonania:
- funkcją `rd_kafka_conf_new` utworzyć tymczasowy obiekt konfigurujący klienta `rd_kafka_conf_t` i ustawić w nim via `rd_kafka_conf_set` wskazanie na broker (nazwa maszyny i port)
- funkcją `rd_kafka_new` utworzyć obiekt producenta wiadomości `rd_kafka_t` na podstawie konfiguracji
- funkcją `rd_kafka_topic_new` utworzyć obiekt dostępowy to topiku `rd_kafka_topic_t`
- w pętelce wysłać tyle komunikatów ile potrzeba funkcją `rd_kafka_produce`
- zapewnić, że wewnętrzna kolejka komunikatów się wysyci i wszystko pójdzie precz, funkcja `rd_kafka_poll`
- pozwalniać zasoby (`rd_kafka_topic_destroy`, `rd_kafka_destroy`)

Funkcje biblioteczne librdkafka są w miarę przyjaźnie skonstruowane i nie ma tam specjalnych dziwadeł, pisząc w C czy C++ całkiem szybko ogarniamy temat. Jednak ich bezpośrednie wykorzystanie w LabVIEW wydało mi się cokolwiek kłopotliwe, stąd poukładałam całość zagadnienia w cztery funkcje-przykrywki, które można spokojnie zawołać z LV kostką `Call Library Node`. Tak oto mamy kolejno:

Inicjalizacja ze wskazaniem na instancję brokera Kafka:

kafkalv.cpp pisze:

Kod: Zaznacz cały

void DLL_EXPORT KafkaInit( char *pszBrokers ) {
   log ( "%s", __func__ );
   hTempConfig = rd_kafka_conf_new();   
   log ( "bootstrap.servers = %s", pszBrokers );            
   if ( rd_kafka_conf_set( hTempConfig, "bootstrap.servers", pszBrokers , errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
      log ( "%s", errstr );               
      log ( "%s", rd_kafka_err2str( rd_kafka_last_error() ) );         
      return;
   }         
   rd_kafka_conf_set_dr_msg_cb( hTempConfig, mesageDeliveryCallback );                     
   log ( "creating producer", NULL );               
   hProducerInstance = rd_kafka_new( RD_KAFKA_PRODUCER, hTempConfig, errstr, sizeof(errstr) );   
   if ( !hProducerInstance ) {
      log ( "%s", errstr );                     
      log ( "%s", rd_kafka_err2str( rd_kafka_last_error() ) );         
      return;
   }
   log ( "producer %08X done", hProducerInstance );   
}


Uzyskanie dostępu do istniejącego topiku, jeżeli takiego nie ma, zostanie utworzony (btw: tego powinno się unikać, no ale niech już zostanie), zwracany jest wskaźnik na topik, będzie dalej wykorzystywany.

kafkalv.cpp pisze:

Kod: Zaznacz cały

unsigned long DLL_EXPORT KafkaGetTopic( char *pszTopicName ) {
   log ( "%s", __func__ );   
   log ( "topic name = %s", pszTopicName );                  
   rd_kafka_topic_t *hTopic = rd_kafka_topic_new( hProducerInstance, pszTopicName, NULL ) ;   
   if ( !hTopic ) {
      log ( "%s", errstr );                           
      log ( "%s", rd_kafka_err2str( rd_kafka_last_error() ) );         
      return 0;
   }
   hTopics [ iTopics ] = (unsigned long)hTopic;
   log ( "topic %d %08X done", iTopics, hTopics [ iTopics ] );               
   iTopics++;
   return (unsigned long)hTopic;
}


Publikacja komunikatu tekstowego na wskazany uchwytem topik:

kafkalv.cpp pisze:

Kod: Zaznacz cały

void DLL_EXPORT KafkaPublishMessage( unsigned long ulTopicHandle, char *pszMessageBody ) {
   log ( "%s (%08X, %s)", __func__, ulTopicHandle, pszMessageBody );      
   rd_kafka_produce(
      (rd_kafka_topic_t*)ulTopicHandle,    // handle topika
      RD_KAFKA_PARTITION_UA,            // uzyj wbudowanego partycjonera
      RD_KAFKA_MSG_F_COPY,            // pracuj na kopii komunikatu
      pszMessageBody,                // tresc komunikatu
      strlen(pszMessageBody),            // i dlugosc
      NULL,                         // nie podajemy klucza
      0,                           // wiec dlugosc 0
      NULL                        // msg_opaque, nie używamy tu :(
   );
   rd_kafka_poll( hProducerInstance , 0 );   // popchniecie do wysylki      
}


No i oczywiście zwolnienie zasobów połączone z dopchnięciem kolanem niewysłanych jeszcze wiadomości:

kafkalv.cpp pisze:

Kod: Zaznacz cały

void DLL_EXPORT KafkaRelease( void ) {
   log ( "%s", __func__ );      
   log ( "dequeue %d messages", rd_kafka_outq_len( hProducerInstance ) );                           
    while ( rd_kafka_outq_len( hProducerInstance ) > 0 ) {
        rd_kafka_poll( hProducerInstance, 100 );          
    }
   log ( "done", NULL );   
   for (int i = 0; i < iTopics; i++ ) {
      rd_kafka_topic_t *hT = (rd_kafka_topic_t*)hTopics [ i ];
      log ( "gonna destroy %d %08X topic", i, hT );                                 
      rd_kafka_topic_destroy( (rd_kafka_topic_t*)hT );      
      hTopics [ i ] = 0;      
      log ( "done", NULL );                     
   }   
   iTopics   = 0;
   log ( "gonna destroy %08X producer", hProducerInstance );                                 
   rd_kafka_destroy( hProducerInstance );   
   log ( "done" );                           
}


Aha, w KafkaInit() widzimy wywołanie funkcji bibliotecznej rd_kafka_conf_set_dr_msg_cb(), już wyjaśniam - to jest rejestracja callback-u, procedurki użytkownika wywoływanej przy każdym potwierdzeniu przyjęcia wiadomości przez broker ewentualnie problemów z zapisaniem wiadomości. Tam możemy podjąć akcje typu ponowienie, zalogować informację o zamkniętej (lub odrzuconej) transakcji etc, dość przydatna rzecz. Ja po prostu zapisuje do logu, że komunikat skutecznie dotarł do Kafki, o tak:

kafkalv.cpp pisze:

Kod: Zaznacz cały

void mesageDeliveryCallback (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
   log ( "%s", __func__ );   
    if ( rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
        log( "Message not delivered, %s", rd_kafka_err2str(rkmessage->err) );
    }
   else {
        log( "Message delivered, payload %d bytes", (int)rkmessage->len );
    }
}


Oczywiście, wszelkie te dobra wyeksponowane na zewnątrz DLL-ki:

kafkalv.h pisze:

Kod: Zaznacz cały

#ifndef _DLL_H_
#define _DLL_H_
#define DLL_EXPORT __declspec(dllexport)
extern "C" {
   void DLL_EXPORT KafkaInit( char* );   
   unsigned long DLL_EXPORT KafkaGetTopic( char* );   
   void DLL_EXPORT KafkaPublishMessage( unsigned long, char* );   
   void DLL_EXPORT KafkaRelease( void );         
}
#endif


Kompletny projekt dla DEV-C++ jest załączniku.

Przechodząc na poziom LV skleciłam taką oto aplikację testową, udziwnienie polega na tym, że chciałam przy okazji sprawdzić, czy biblioteka librdkafka pracuje wielowątkowo - stąd cztery niezależne pętelki piszące do czterech topików. Ponieważ zadana jest taka sama ilość komunikatów - można dla uproszczenia zmierzyć czas w obrębie slajdu z pętlami a jako ilość wiadomości - sumę wykonań zdefiniowanych pętli. Pomiar czasu końcowego odbywa się po wykonaniu kostki wywołującej KafkaRelease(), ponieważ tam jest finalizacja wszelkich wysyłek, a to chwilkę jednak trwa.

kafka__labview__1p.png

kafka__labview__1d.png


wysyłka jednowątkowa

Test to wysyłka sumarycznie 40 tysięcy komunikatów, w 100 cyklach - inicjalizacja/wysyłka/sprzątanie.
Pierwszy wariant to wykonanie kostki `Call Library Node` z opcją `Run in UI thread`, ponieważ tu jest taki jakby haczyk. W centralnej części diagramu widzimy cztery równoległe pętle, no super - ktoś pomyśli że to się wykona "równolegle" w kilku wątkach. No i właśnie nie bardzo, ponieważ w pętli jest wywoływana zewnętrzna funkcja, której użycie jest obwarowane restrykcją na główny wątek interfejsu użytkownika, a ten jak łatwo zgadnąć - mamy jeden. LabVIEW jest chytre i wykrywa takie niuanse, odpowiednio sterując wykonaniem diagramu, w tym przypadku zastosowany bedzie podział czasu pomiędzy pętle - najpierw pochodzi pierwsza, potem druga, kolejna ... i tak w kółko. Każda po troszku, aż zrobią swoje. I zauważmy też, że nie występuje tu ryzyko korupcji danych wynikające z możliwości wpłynięcia wywołania z jednego wątku na inny wątek ... bo cały czas jest jeden - UI.

Na filmiku, na żywo wygląda to następująco:

https://youtu.be/qQaf_MgXJoo

Po zakończeniu wysyłki pokazuje Kafka Tool-em ile nagromadziło się wiadomości w kolejnych topikach, widać że nic się nie zgubiło i w każdym z czterech tematów mamy po dziesięć tysięcy wiadomości. Zerknijmy we fragment logu aplikacji:

kafkalv-20190719-UI.txt pisze:

Kod: Zaznacz cały

[2019-07-19T08:38:20.917][kafkalv.cpp][59] hInst:62500000, DLL_PROCESS_ATTACH ------------------------------
[2019-07-19T08:38:23.847][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:38:55.734][kafkalv.cpp][59] KafkaInit
[2019-07-19T08:38:55.737][kafkalv.cpp][59] bootstrap.servers = localhost:9092
[2019-07-19T08:38:55.739][kafkalv.cpp][59] creating producer
[2019-07-19T08:38:55.743][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:38:55.744][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:38:55.746][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:38:55.746][kafkalv.cpp][59] producer 0BCCE3C0 done
[2019-07-19T08:38:55.760][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:38:55.770][kafkalv.cpp][59] KafkaGetTopic
[2019-07-19T08:38:55.772][kafkalv.cpp][59] topic name = LV.TEST.TOPIC.2
[2019-07-19T08:38:55.776][kafkalv.cpp][59] topic 0 0BD168F8 done
[2019-07-19T08:38:55.778][kafkalv.cpp][59] KafkaGetTopic
[2019-07-19T08:38:55.780][kafkalv.cpp][59] topic name = LV.TEST.TOPIC.3
[2019-07-19T08:38:55.782][kafkalv.cpp][59] topic 1 0BD16C20 done
[2019-07-19T08:38:55.784][kafkalv.cpp][59] KafkaGetTopic
[2019-07-19T08:38:55.786][kafkalv.cpp][59] topic name = LV.TEST.TOPIC.4
[2019-07-19T08:38:55.789][kafkalv.cpp][59] topic 2 0BD16F48 done
[2019-07-19T08:38:55.794][kafkalv.cpp][59] KafkaGetTopic
[2019-07-19T08:38:55.796][kafkalv.cpp][59] topic name = LV.TEST.TOPIC.1
[2019-07-19T08:38:55.798][kafkalv.cpp][59] topic 3 0BD17270 done
[2019-07-19T08:38:55.801][kafkalv.cpp][59] KafkaPublishMessage (0BD168F8, to jest tesowy wpis (2)  0)
...
[2019-07-19T08:38:55.840][kafkalv.cpp][59] KafkaPublishMessage (0BD168F8, to jest tesowy wpis (2)  15)
[2019-07-19T08:38:55.842][kafkalv.cpp][59] KafkaPublishMessage (0BD16C20, to jest tesowy wpis (3)  0)
...
[2019-07-19T08:38:55.896][kafkalv.cpp][59] KafkaPublishMessage (0BD16C20, to jest tesowy wpis (3)  18)
[2019-07-19T08:38:55.898][kafkalv.cpp][59] KafkaPublishMessage (0BD16F48, to jest tesowy wpis (4)  0)
...
[2019-07-19T08:38:55.950][kafkalv.cpp][59] KafkaPublishMessage (0BD16F48, to jest tesowy wpis (4)  16)
[2019-07-19T08:38:55.956][kafkalv.cpp][59] KafkaPublishMessage (0BD17270, to jest tesowy wpis (1) 0)
...
[2019-07-19T08:38:56.002][kafkalv.cpp][59] KafkaPublishMessage (0BD17270, to jest tesowy wpis (1) 11)
[2019-07-19T08:38:56.009][kafkalv.cpp][59] KafkaPublishMessage (0BD168F8, to jest tesowy wpis (2)  16)
...
[2019-07-19T08:38:56.060][kafkalv.cpp][59] KafkaPublishMessage (0BD168F8, to jest tesowy wpis (2)  31)
[2019-07-19T08:38:56.063][kafkalv.cpp][59] KafkaPublishMessage (0BD16C20, to jest tesowy wpis (3)  19)
...


Mamy wylistowane na starcie wartości uchwytów do topików: 0BD168F8,0BD16C20,0BD16F48,0BD17270 i widać ładnie jak LV pomimo upierdliwego ograniczenia swobody w wołaniu DLL-ki próbuje jakoś dzielić czas pętli, zachowując wrażenie równoległego wykonania.

Skoro już mamy log, to zerknijmy na jeszcze jedną sprawę - wysyłkę online vs wewnętrzne kolejkowanie. Mały grep po logu - zwróconych linijek jest dokładnie 100 - tyle mamy obiegów zewnętrznej pętli w LV:

grep dequeue kafkalv-20190719-UI.txt pisze:

Kod: Zaznacz cały

[2019-07-19T08:38:56.360][kafkalv.cpp][59] dequeue 400 messages
[2019-07-19T08:38:58.397][kafkalv.cpp][59] dequeue 139 messages
[2019-07-19T08:38:59.800][kafkalv.cpp][59] dequeue 62 messages
[2019-07-19T08:39:00.667][kafkalv.cpp][59] dequeue 220 messages
[2019-07-19T08:39:02.073][kafkalv.cpp][59] dequeue 30 messages
[2019-07-19T08:39:02.990][kafkalv.cpp][59] dequeue 82 messages
[2019-07-19T08:39:03.777][kafkalv.cpp][59] dequeue 365 messages
[2019-07-19T08:39:05.654][kafkalv.cpp][59] dequeue 171 messages
[2019-07-19T08:39:07.128][kafkalv.cpp][59] dequeue 27 messages
[2019-07-19T08:39:08.113][kafkalv.cpp][59] dequeue 78 messages
[2019-07-19T08:39:09.176][kafkalv.cpp][59] dequeue 188 messages
[2019-07-19T08:39:10.653][kafkalv.cpp][59] dequeue 28 messages
[2019-07-19T08:39:11.273][kafkalv.cpp][59] dequeue 260 messages
[2019-07-19T08:39:12.977][kafkalv.cpp][59] dequeue 10 messages
[2019-07-19T08:39:14.023][kafkalv.cpp][59] dequeue 16 messages
....


Widzimy, że wiadomości z pierwszego obiegu aplikacji niby wysłane online - praktycznie w całości ( 4 x 100 sztuk ) wylądowały w kolejce buforowej i dopiero z niej zostały szybko wypchnięte w stronę brokera, na docelowe topiki. Rozruch nieco ciężki, ale zakładam że po stronie Kafki musiały zajść jeszcze jakieś czynności ( wkońcu to jest klient-serwer ) w związku z zapięciem się mojego labviowego producenta. Potem aplikacji szło całkiem dobrze i zaległości do wypchnięcia po zakończeniu równoległych pętli nie były zbyt duże, a finalnie uzyskana efektywność wysyłki była na poziomie 350..400 wiadomości na sekundę.

wysyłka wielowątkowa

Kolejny test, to drobna zmiana podejścia, przeklikałam tę część diagramu, gdzie wołana jest kostka 'Call Library...' i funkcja KafkaPublishMessage() zmieniając tryb wykonania na `Run in any thread`. No i zobaczmy co z tego wyszło - filmik:

https://youtu.be/Ms4zWjoobKU

Jak widać, niby jest szybciej (w szczycie zdarzało się grubo ponad 700..800/sek) ale zdarzały się i niepokojące zamulenia do wartości kilkunastu komunikatów na sekundę, dość chaotyczne zmiany szybkości widać z resztą na wykresiku. No i najgorsze jest to, że aplikacja tnie się jak emo po śmierci chomika, dają się zauważyć chwile zamrożenia panelu frontowego (zagłodzony wątek UI). Finalnie wysyłka jakoś się udaje, okienkami Kafka Tool pokazuje, że przybyło kolejne 10 tyś. wiadomości do każdego topika, na masę się zgadza. Zerknijmy jeszcze w log z tego wykonania.

kafkalv-20190719-any.txt pisze:

Kod: Zaznacz cały

[2019-07-19T08:55:36.284][kafkalv.cpp][59] hInst:62500000, DLL_PROCESS_ATTACH ------------------------------
[2019-07-19T08:55:38.519][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:56:25.935][kafkalv.cpp][59] KafkaInit
[2019-07-19T08:56:25.936][kafkalv.cpp][59] bootstrap.servers = localhost:9092
[2019-07-19T08:56:25.938][kafkalv.cpp][59] creating producer
[2019-07-19T08:56:25.944][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:56:25.948][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:56:25.950][kafkalv.cpp][59] producer 098E18E8 done
[2019-07-19T08:56:25.953][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:56:25.965][kafkalv.cpp][59] hInst:62500000, DLL_THREAD_ATTACH
[2019-07-19T08:56:25.973][kafkalv.cpp][59] KafkaGetTopic
[2019-07-19T08:56:25.974][kafkalv.cpp][59] topic name = LV.TEST.TOPIC.2
[2019-07-19T08:56:25.976][kafkalv.cpp][59] topic 0 0BCD4D48 done
[2019-07-19T08:56:25.977][kafkalv.cpp][59] KafkaGetTopic
[2019-07-19T08:56:25.979][kafkalv.cpp][59] topic name = LV.TEST.TOPIC.3
[2019-07-19T08:56:25.981][kafkalv.cpp][59] topic 1 0BCD5070 done
[2019-07-19T08:56:25.983][kafkalv.cpp][59] KafkaGetTopic
[2019-07-19T08:56:25.984][kafkalv.cpp][59] topic name = LV.TEST.TOPIC.4
[2019-07-19T08:56:25.989][kafkalv.cpp][59] topic 2 0BCD5398 done
[2019-07-19T08:56:25.991][kafkalv.cpp][59] KafkaGetTopic
[2019-07-19T08:56:25.993][kafkalv.cpp][59] topic name = LV.TEST.TOPIC.1
[2019-07-19T08:56:25.996][kafkalv.cpp][59] topic 3 0BCD56C0 done
[2019-07-19T08:56:25.999][kafkalv.cpp][59] KafkaPublishMessage (0BCD56C0, to jest tesowy wpis (1) 0)
[2019-07-19T08:56:25.999][kafkalv.cpp][59] KafkaPublishMessage (0BCD5398, to jest tesowy wpis (4)  0)
[2019-07-19T08:56:26.002][kafkalv.cpp][59] KafkaPublishMessage (0BCD56C0, to jest tesowy wpis (1) 1)
[2019-07-19T08:56:25.999][kafkalv.cpp][59] KafkaPublishMessage (0BCD4D48, to jest tesowy wpis (2)  0)
[2019-07-19T08:56:25.999][kafkalv.cpp][59] KafkaPublishMessage (0BCD5070, to jest tesowy wpis (3)  0)
[2019-07-19T08:56:26.004][kafkalv.cpp][59] KafkaPublishMessage (0BCD4D48, to jest tesowy wpis (2)  1)
[2019-07-19T08:56:26.003][kafkalv.cpp][59] KafkaPublishMessage (0BCD56C0, to jest tesowy wpis (1) 2)
[2019-07-19T08:56:26.005][kafkalv.cpp][59] KafkaPublishMessage (0BCD5398, to jest tesowy wpis (4)  2)
[2019-07-19T08:56:26.006][kafkalv.cpp][59] KafkaPublishMessage (0BCD5070, to jest tesowy wpis (3)  1)
[2019-07-19T08:56:26.006][kafkalv.cpp][59] KafkaPublishMessage (0BCD4D48, to jest tesowy wpis (2)  2)
[2019-07-19T08:56:26.006][kafkalv.cpp][59] KafkaPublishMessage (0BCD56C0, to jest tesowy wpis (1) 3)
[2019-07-19T08:56:26.007][kafkalv.cpp][59] KafkaPublishMessage (0BCD5398, to jest tesowy wpis (4)  3)
[2019-07-19T08:56:26.007][kafkalv.cpp][59] KafkaPublishMessage (0BCD5070, to jest tesowy wpis (3)  2)
[2019-07-19T08:56:26.008][kafkalv.cpp][59] KafkaPublishMessage (0BCD4D48, to jest tesowy wpis (2)  3)
[2019-07-19T08:56:26.009][kafkalv.cpp][59] KafkaPublishMessage (0BCD56C0, to jest tesowy wpis (1) 4)
[2019-07-19T08:56:26.012][kafkalv.cpp][59] KafkaPublishMessage (0BCD4D48, to jest tesowy wpis (2)  4)
[2019-07-19T08:56:26.012][kafkalv.cpp][59] KafkaPublishMessage (0BCD5070, to jest tesowy wpis (3)  3)


Widzimy ładnie przeplatające się wywołania funkcji bibliotecznej - pętle for() napędzające wysyłkę na każdy topik pracują równolegle i w miarę równomiernie. Orgastyczna uciecha kończy się niestety po analizie ilości kolejkowanych wewnętrznie komunikatów:

grep dequeue kafkalv-20190719-any.txt pisze:

Kod: Zaznacz cały

[2019-07-19T08:56:26.465][kafkalv.cpp][59] dequeue 225 messages
[2019-07-19T08:56:27.134][kafkalv.cpp][59] dequeue 316 messages
[2019-07-19T08:56:29.220][kafkalv.cpp][59] dequeue 153 messages
[2019-07-19T08:56:29.834][kafkalv.cpp][59] dequeue 400 messages
[2019-07-19T08:56:31.750][kafkalv.cpp][59] dequeue 46 messages
[2019-07-19T08:56:32.282][kafkalv.cpp][59] dequeue 212 messages
[2019-07-19T08:56:32.782][kafkalv.cpp][59] dequeue 400 messages
[2019-07-19T08:56:33.472][kafkalv.cpp][59] dequeue 400 messages
[2019-07-19T08:56:34.387][kafkalv.cpp][59] dequeue 270 messages
[2019-07-19T08:56:35.539][kafkalv.cpp][59] dequeue 250 messages
[2019-07-19T08:56:36.520][kafkalv.cpp][59] dequeue 138 messages
[2019-07-19T08:56:36.907][kafkalv.cpp][59] dequeue 400 messages
[2019-07-19T08:56:37.935][kafkalv.cpp][59] dequeue 238 messages
[2019-07-19T08:56:40.177][kafkalv.cpp][59] dequeue 2 messages
[2019-07-19T08:56:40.453][kafkalv.cpp][59] dequeue 313 messages
[2019-07-19T08:56:41.012][kafkalv.cpp][59] dequeue 400 messages
[2019-07-19T08:56:41.908][kafkalv.cpp][59] dequeue 67 messages
[2019-07-19T08:56:42.747][kafkalv.cpp][59] dequeue 12 messages
[2019-07-19T08:56:42.920][kafkalv.cpp][59] dequeue 400 messages


No i tu jest jakby problem - co z tego, że równolegle pracujące pętle szybko pozbyły się wiadomości i poutykały je w wewnętrznej kolejce buforowej biblioteki, jak po zakończeniu pętli trzeba z tej kolejki zmagazynowany towar wysłać do brokera. A groźne jest to zachowanie ponieważ gdy aplikacja zaliczy nam pad akurat w momencie przejęcia sterowania przez funkcję KafkaRelease() (a tam jest wysyłka zaległości) to możemy się pożegnać z tymi komunikatami.

Ostatni dziś pokaz to wspominany na pierwszym rysunku melanż LabVIEW, Kafki i stosu ELK, tak zupełnie dla ciekawości jak to się uda.
W Kafce są cztery topiki - dane z nich ładowane są do Elastica przez Logstash na podstawie banalnej wręcz konfiguracji strumienia:

grep dequeue kafkalv-20190719-any.txt pisze:

Kod: Zaznacz cały

input {
   kafka {
      bootstrap_servers => "localhost:9092"
        topics => [ "LV.TEST.TOPIC.1",
               "LV.TEST.TOPIC.2",
               "LV.TEST.TOPIC.3",                   
               "LV.TEST.TOPIC.4"   ]
    }
}
output {
   elasticsearch {
      hosts => ["localhost:9200"]
      index => "lv-test-%{+YYYY.MM.dd}"
      user => "elastic"
      password => "?"
   }
}


Oczywiście uruchomiony Elastic, do kompletu Kibana 7 oraz ... monitor systemu, ponieważ chciałam pokazać, że Kafka tak daje do wiwatu strumieniem komunikatów aż pióra lecą, proszę - całość na żywo:

https://youtu.be/2VtBr8vcK9w

Z powyższej pisanki wnioski są chyba oczywiste - melanż LabVIEW i Kafki jest możliwy i nie jest to jakaś wiedza tajemna, trzeba tylko przysiąść na czterech literach i nieco poczytać i poeksperymentować. Z dokładością do LabVIEW możemy spokojnie poprzestać na darmowych narzędziach, jest ich sporo, bo Kafka to coraz popularniejszy produkt.

Uważać należy na aplikacje wielowątkowe i dobrze się zastanowić, co chcemy wykonywać w wątku głównym programu, a co w wątkach roboczych. Jest takie powiedzenie - rób dwa razy wolniej, będzie trzy razy szybciej - i w/g mnie do tego zagadnienia całkiem nieźle pasuje. Kod źrdódłowy biblioteki `librdkafka` jest dostępny, analiza pod kątem adaptacji do naszych potrzeb jest realna, kwestia chęci i uporu.

No i na koniec, głównie w kontekście ostatniego filmiku - do takich zabaw to jednak trzeba dysponować niezłym sprzętem, a moim zdaniem najlepiej zabawki stosu ELK oraz samą Kafkę wynieść na mocniejszą maszynę z jakimś 64-bitowym Linuksem. Nie chciałam już tu motać w tekście wszelkimi niedogodnościami związaymi z plikami *.bat czy nagłym zamykaniem się Kafki po starcie, o tu m.in. goglowałam wczoraj Kafka: unable to start Kafka..., na Linuksach ludzie nie mają takich dziwnych problemów jak `java.nio.file.FileSystemException`...


#slowanawiatr
Nie masz wymaganych uprawnień, aby zobaczyć pliki załączone do tego posta.
______________________________________________ ____ ___ __ _ _ _ _
Kończysz tworzyć dopiero, gdy umierasz. (Marina Abramović)


Wróć do „Inne języki programowania”

Kto jest online

Użytkownicy przeglądający to forum: Obecnie na forum nie ma żadnego zarejestrowanego użytkownika i 4 gości