Task oder Job Queues sind ein bewährtes Mittel um komplexe Aufgaben asynchron auszuführen und den Benutzer einer Anwendung nicht unnötig vor einer ladenden Website oder „eingefrorenen“ Software warten zu lassen. Ein weiterer, nicht zu unterschätzender Vorteil ist Auslagerung möglichererweise rechenintensiver Operationen in andere Prozesse, die nichts mit dem Webserver zutun haben.
Das PHP Framework Laravel liefert in seinem Funktionsumfang ebenfalls eine Queue-Funktionalität mit und gibt seinen Nutzeren auf diesem Weg ein Werkzeug an die Hand, das Nutzungsverhalten von komplexeren Laravel-Projekten stark zu verbessern.
In diesem Artikel befasse ich mich mit der Herausforderung einen Importer für CSV-Dateien in Laravel zu integrieren. Die Besonderheit ist, dass diese CSV Datein ungf. 50000 – 100000 Datensätze beinhalten können und der „Echtzeit-Import“ somit nicht möglich ist, schließlich kann der Benutzer der Anwendung nicht 30-60 Minuten warten, bis die Website wieder reagiert.
Für das „Abarbeiten“ der CSV-Datensätze werde ich kein Plugin verwenden. Ich entwickle gemeinsam mit Dir eigene kleine Code–Schnippsel, um CSV-Dateien Datensatz für Datensatz auszulesen und die beinhalteten Daten in eine Datenbank zu speichern.
Was ist eine Task Queue?
Eine Task Queue ist im Grunde eine Liste an Aufgaben, die sequenziell abgearbeitet wird. Welche Aufgabe als nächstes bearbeitet wird hängt von unterschiedlichen Kriterien ab. Der einfachste Fall ist „first in, first out“, also „wer zuerst kommt, mahlt zuerst“. Die Aufgabe die sich zuerst an unserer Queue angemeldet hat, wird auch als erstes bearbeitet. Weiterhin ist es möglich die Aufgaben mit weiteren Parametern zu versehen, wie bspw. einer Priorität, oder einem Wert für die Zeit, die die Aufgabe schon in der Queue verbringt, damit nicht nur Jobs einer höheren Priorität bearbeitet werden.
Queues die auf Prioritäten basieren haben aber eine andere Struktur als einfache Queues. Bei diesem Vorgehen gibt es für jede Prioriät eine eigene Queue. Verändert sich also die Priorität eines Jobs, wird er in eine andere Queue verschoben. Damit befassen ich mich aber in diesem Artikel noch nicht.
Betrachten wir nun erst mal die typischen Rollen, die in einer Queue agieren. Typische Rollen in einem Queue System sind der „Dispatcher“, der die Aufgaben entgegennimmt und sie gegebenenfalls parametrisiert in die Queue einfügt, die Queue als solche, der Queue Worker, der immer die Aufgabe aus der Queue entnimmt, die als nächstes an der Reihe ist und „last but not least“ der Code, der die Aufgabe verarbeitet. In unserem Beispiel ist das der Import einer großen CSV-Datei.
Nachfolgend findest Du eine Grafik, die den Aufbau eines Queue Systems grob skizziert:
Durch diesen Ablauf wird gewährleistet, dass der Webserver durch die komplexen Operationen nicht überlastet wird. Jeder ausstehende Import wird der Reihe nach abgearbeitet.
In der folgenden Grafik habe ich noch mal grob skizziert, wie die Rollenvertreilung in einer priorisierten Queue aufgebaut ist.
Wie wird die Task Queue in Laravel integriert?
Laravel bietet zu diesem Thema, wie auch zu vielen anderen Themen, eine sehr gute Dokumentation unter: Laravel 5.5 – Queues. Beginnen wir erst mal mit der Konfiguration. Im Verzeichnis „config“ Deiner Laravel-Installation findest Du die Queue Konfiguration „queue.php“. In dieser Datei Konfigurierst Du die Queue-Connection. Laravel bietet die Möglichkeit, Deine Queues in unterschiedlichen Systemen zu hinterlegen, wie unteranderem einer Datenbank, Amazons SQS, oder auch Redis. Wie Du Paypal in Laravel integrierst, erfährst Du hier.
In diesem Beitrag entscheide ich mich für den Connection-Typ „database“. Du kannst auch gerne eine andere Variante wählen. Ich gehe im Zuge dieses Beitrags aber lediglich auf die Variante Datenbank ein, da Performance in diesem Fall kein Kriterium ist.
Um nun die Datenbankmigration vorzubereiten, die Dir die benötigten Tabellenstrukturen für die Verwaltung der Jobs bereitstellt, verwendest Du Laravels CLI artisan.
$ php artisan queue:table
Die erstellte Migration integrierst Du über den Befehl „migrate“.
$ php artisan migrate
Die Integration der Job Klasse ist ebenfalls sehr simpel. Du kannst über den „make:job“ Befehl ein Template erzeugen lassen.
$ php artisan make:job ImportCsvFile
Das erzeugte Template sieht nun in etwas wie folgt aus:
<?php namespace AppJobs; use IlluminateBusQueueable; use IlluminateQueueSerializesModels; use IlluminateQueueInteractsWithQueue; use IlluminateContractsQueueShouldQueue; use IlluminateFoundationBusDispatchable; class ImportCsvFile implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $csvFilePath; /** * Create a new job instance. * * @param string $csvFilePath * @return void */ public function __construct($csvFilePath) { $this->csvFilePath = $csvFilePath; } /** * Execute the job. * * @return void */ public function handle() { // Importieren des CSV Datei } }
Jobs verfügen über die Implementierung gegen das Interface ShouldQueue über die statische Methode dispatch. Diese ermöglicht es, den Job an die Queue anzuhängen. Als Parameter erwartet die Methode den CSV Pfad der Datei.
Integration des CSV-Import Controllers
Um das Beispiel etwas greifbarer zu machen zeige ich Dir nun noch, wie Du einen passenden Controller bereitstellst, der den Dispatcher später ausführt. Dafür erstellst Du den Controller „CsvImportController“ über artisan.
$ php artisan make:controller CsvImportController
Im folgen Beispiel führe ich drei Wege an, über die Du den dispatcher ansteuern kannst.
- Direkt
- Verzögert
- Mit anderen Jobs verkettet
<?php namespace AppHttpControllers; use AppJobsImportCsvFile; use IlluminateHttpRequest; use IlluminateSupportFacadesStorage; class CsvImportController extends Controller { public function index () { return view('import.csvfile'); } public function store (Request $request) { // CSV Datei hochladen und temporär speichern $csvFilePath = Storage::putFile('uploads', $request->file('csvfile')); // Import dispatchen ImportCsvFile::dispatch($csvFilePath); /* ### Weitere dispatch Optionen ## */ // Mit Verzögerung // ------------------------------------- // ImportCsvFile::dispatch($csvFilePath) // ->delay(now()->addMinutes(10)); // Job Verkettung // ------------------------------------- // ImportCsvFile::withChain([ // new ValidateCsvFile, // new ImportCsvFileToDatabase // ])->dispatch(); } }
Integration der CSV-Import View und Route
Die View (csvfile.blade.php) ist in diesem Beispiel minimalistisch dargestellt. Sie enthält lediglich ein Formular um die Datei hochzuladen. Als action ist die „named route“ → „csvimport.store“ hinterlegt, die Du im nachfolgenden Schritt integrierst.
<h2>CSV Importieren</h2> <form action="{{ route('csvimport.store') }}" method="post" enctype="multipart/form-data"> {{ csrf_field() }} <label for="csvfile">Bitte wählen Sie eine CSV Datei aus</label> <input type="file" name="csvfile" id="csvfile"> <br /> <button type="submit">Datei importieren</button> </form>
Die Routen trägst Du in der „web.php“ unter dem Verzeichnis „routes“ ein.
Route::get('/csvimport', 'CsvImportController@index')->name('csvimport.index'); Route::post('/csvimport/store', 'CsvImportController@store')->name('csvimport.store');
Ein Blick auf die CSV Datei und ihr Model
Die CSV Datei in diesem Beispiel ist ebenfalls minimalistisch aufgebaut. Ich habe mich für eine simple „Sales“ CSV Datei (Demo: “task_queue_sales.csv“) entschieden, die über die Spalten „day“ und „revenue“ verfügt. Zunächst erstellst Du für das Beispiel das Model „Sales“ und die zugehörige Migrationsdatei über folgenden Befehl.
$ php artisan make:model Sales -m
Im erzeugten Model hinterlegst Du nun noch die beiden Properties „table“ und „fillable“.
<?php namespace App; use IlluminateDatabaseEloquentModel; class Sales extends Model { protected $table = 'sales'; protected $fillable = ['day', 'revenue']; }
Nun erzeugst Du noch eine Migration um die Spalten „day“ und „revenue“ zum Sales Model hinzuzufügen.
$ php artisan make:migration add_properties_to_sales
Die Migrationsdatei passt Du nun folgendermaßen an.
<?php use IlluminateSupportFacadesSchema; use IlluminateDatabaseMigrationsMigration; class AddPropertiesToSales extends Migration { /** * Run the migrations. * * @return void */ public function up() { Schema::table('sales', function($table) { $table->date('day')->unique(); $table->float('revenue'); }); } /** * Reverse the migrations. * * @return void */ public function down() { Schema::table('sales', function($table) { $table->dropColumn('day'); $table->dropColumn('revenue'); }); } }
Nun gilt es die Migrationen anzulegen. Dazu verwendest Du den Befehl Migrate. Dazu ist es aber wichtig, dass Du die Datenbankverbindung in Laravel konfiguriert hast.
$ php artisan migrate
Nach einer erfolgreichen Migration sind die vorbereiteten Spalten in der Tabelle „sales“ hinterlegt. Wir können nun damit beginnen das Model in den CSV Importer zu integrieren. Dazu öffnest Du nun wieder die angelegte Job Klasse „ImportCsvFile“. In der „handle“ Methode hinterlegst Du nun den Code um die gespeicherte CSV Datei zeilenweise auszulesen und in die passenden Models anzulegen. Wichtig ist ebenfalls, dass Du den Namespace des Sales Models einträgst.
<?php namespace AppJobs; use AppSales; use Exception; use IlluminateBusQueueable; use IlluminateQueueSerializesModels; use IlluminateQueueInteractsWithQueue; use IlluminateContractsQueueShouldQueue; use IlluminateFoundationBusDispatchable; use IlluminateSupportFacadesStorage; class ImportCsvFile implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $csvFilePath; /** * Create a new job instance. * * @param string $csvFilePath * @return void */ public function __construct($csvFilePath) { $this->csvFilePath = $csvFilePath; } /** * Execute the job. * * @throws Exception * @return void */ public function handle() { if (!Storage::disk('local')->exists($this->csvFilePath)) throw new Exception(sprintf( 'Die CSV Datei konnte nicht unter dem angegebenen Dateipfad gefunden werden. Dateipfad: "%1$s"' , $this->csvFilePath )); // Importieren der CSV Datei $fileData = Storage::get($this->csvFilePath); $csvData = str_getcsv($fileData, "n"); foreach ($csvData as &$row) { $row = str_getcsv($row, ","); if(is_array($row) && sizeof($row) == 2) { $day = date('Y-m-d', strtotime($row[0])); Sales::create([ 'day' => $day, 'revenue' => $row[1] ]); } } } }
Der in der „handle“ Methode integrierte Code ist sehr simpel und um alles kurz und knapp zu halten, nicht refaktorisiert. Diverse Blöcke lassen sich wunderbar auslagern. Desweiteren sollten an mehreren Stellen Validierungen integriert werden.
Im ersten Abschnitt der Methode validiere Ich lediglich kurz, ob die Datei im „storage“ Ordner hinterlegt wurde. Ist dass der Fall, dann wird deren Inhalt geladen und mit der PHP Methode „str_getcsv()“ als Array geparsed.
Im letzten Abschnitt werden alle Zeilen der CSV Datei durchlaufen und die Daten jeder Zeile als Sale in der Datenbank gespeichert.
Queue Listener starten
Damit die Queue nun auch abgearbeitet wird, muss der Queue Listener gestartet werden.
$ php artisan queue:listen
Im Grunde stützt sich die Abarbeitung der Queue auf diesen CLI Prozess. Sollte die Queue auf einem Server eingerichtet werden und auch nach einem Fehler oder Serverneustart wieder erreichbar sein, so lohnt es sich für Dich einen Blick auf das Programm Supervisor zu werfen. Dieses läuft unter Linux Betriebssystemen und kann schnell installiert und konfiguriert werden. Laravel bietet in der Dokumentation ein Kapiel zu Supervisor an Laravel 5.5 – Supervisor configuration.
Fazit
In diesem Blogeintrag hast Du gelernt, wie Du Deine eigene Queue gestalten und entwickeln kannst. Du hast zu Beginn des Artikels die Queue in Deinem Laravel Projekt eingerichtet und hast Deinen eigenen Queue Job erstellt. Gegen ende des Artikels hast Du noch erfahren, wie Du aus dem Queue Job heraus eine CSV Datei in Deine Datenbank importieren kannst.
Solltest Du Fragen oder Anmerkungen haben, zögere nicht einen Kommentar zu hinterlassen.