AUTOR Matthias vom Bruch

Radikale Reaktivität in Angular Teil 2

Komposition

Dieser Artikel ist Teil einer Serie, in der ich versuche, mentale Modelle zu vermitteln, die zu gutem RxJS-Code führen. Ich empfehle, sie der Reihe nach zu lesen. Anhand des Codes einer Demo-Anwendung werden häufige Probleme in RxJS aufgezeigt und Verbesserungsmöglichkeiten vorgeschlagen.

Was sind Promises, was sind Observables?

Ein Observable ist für ein Promise das, was ein Array für einen einzelnen Wert ist.

 

Ein einzelner Wert ist ein Wert im RAM. Ein Array sind mehrere Werte, die im RAM angeordnet sind.

 

Eine Promise ist ein Wert in der Zeit. Ein Observable sind mehrere zeitlich angeordnete Werte.

 

Promises wurden eingeführt, um Daten zu verarbeiten, die asynchron ankommen, wie bei einer HTTP-Anfrage. Sie fordern sie an und wissen, dass sie irgendwann ankommen werden, aber Sie wissen nicht, wann, und wollen in der Zwischenzeit andere Dinge tun. Deshalb verwenden Sie eine Promise, die geduldig auf das Eintreffen der Daten wartet und dann mit ihnen das tut, was der Callback, den Sie der Methode then übergeben, vorgibt.

 

Promises sind nicht in der Lage, mehrere Datenpakete zu modellieren, die nacheinander ankommen. Sie liefern nur genau einmal Daten. Dann sind sie fertig. Was aber, wenn Sie einen Websocket haben, das viele, viele Male Daten liefern kann?

 

Observables werden verwendet, wenn man weiß, dass Daten irgendwann ankommen werden, vielleicht sogar mehrmals, man aber nicht weiß, wann, und in der Zwischenzeit andere Dinge tun möchte. Sie verwenden also ein Observable, das geduldig darauf wartet, dass Ihre Daten eintreffen, und dann mit ihnen das tut, was es durch den Callback, den Sie an seine subscribe Methode übergeben,  zu tun angewiesen wurde.

 

Da Observables potenziell lange existieren und agieren können, ermöglichen sie es uns, unsere Anwendung als ein einziges, riesiges Ereignisnetzwerk zu modellieren, in dem Daten frei fließen, sobald sie verfügbar sind. Dies zu tun, ist eine kleine Kunst. Ich werde versuchen, Ihnen einige der Dinge beizubringen, die ich gelernt habe und die helfen, schöne (und nützliche) Netzwerke zu entwerfen.

Anti-Pattern 1: Imperative Ausführung von Aktionen und Behandlung von Observables wie Promises.

Der folgende Code zeigt, wie wir eine Funktion im Konstruktor unserer Komponente ausführen, die einen Serviceaufruf tätigt, der ein Observable zurückgibt. Es wird abonniert und die im Callback erhaltenen Daten verwendet, um die Felder this.clients und this.freeCars zu befüllen.

// Code, der imperativ den Zustand aktualisiert und ein Observable wie eine Promise behandelt.
// ...
  constructor(
    private readonly dataService: MockDataService,
  ) {
    this.refreshState();
  }


  refreshState() {
    this.dataService.getState().subscribe(state => {
      this.clients = state.users.map(user => ({ user, cars: [] }));
      this.freeCars = state.cars;
      this.clients?.forEach(client => {
        if (!client.user.id) {
          return;
        }
        state.userCarBindings.forEach(binding => {
          if (binding.userId === client.user.id) {
            const car = state.cars.find(c => c.id === binding.carId);
            if (car) {
              const index = this.freeCars?.findIndex(c => c.id === car?.id);
              if (index !== undefiniert && index !== -1) {
                this.freeCars?.splice(index, 1);
              }
              client.cars.push(car);
            }
          }
        });
      });
    });
  }
// ...

Warum ist dieser Code problematisch?

 

Weil er die Annahme macht, dass das Observable von this.dataService.getState() nur einmal Daten liefert und dann abgeschlossen ist. Dies könnte der Fall sein und ist es hier auch, denn MockDataService gibt vor, nur HTTP-Aufrufe vorzubereiten und das resultierende Observable an den Aufrufer zu übergeben. Aber das wissen wir nur, wenn wir den Service inspizieren. Standardmäßig müssen wir davon ausgehen, dass Observables im Laufe ihrer Lebensdauer viele, viele Werte erzeugen. Selbst wenn wir den Dienst inspizieren und herausfinden, dass er sich wie eine Promise verhält und wir wissen, dass wir auf der sicheren Seite sind, haben wir keine Garantie, dass sich das Verhalten des Services niemals ändern wird. Und wenn es das tut, merken wir es vielleicht nicht. Die Signatur der Methode würde sich nicht ändern. Und schon haben wir zufällige Zustandsaktualisierungen und in einigen Fällen auch Memory Leaks.

 

Ein typischer Weg, diese Probleme zu vermeiden, besteht darin, selbst sicherzustellen, dass wir nur einen Wert erhalten, indem wir einen Operator wie first() verwenden.

this.dataService.getState().pipe(first()).subscribe(/*...*/);

Wir müssen aber nicht zwingend solch imperative Methoden verwenden. Anstatt ständig Daten zu abzurufen, zu ändern, zu aktualisieren, zu löschen und so weiter, definieren wir einfach die Ursache und Wirkung einer bestimmten Aktion und lassen die Ereignisse frei fließen. Wir machen nicht mehr die Arbeit, sondern wir definieren den Prozess. Das ist es, worum es bei der Verwendung von RxJS geht.

 

Nehmen wir this.clients als Beispiel. In der naiven Version ist es wie folgt definiert:

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent {
clients: { user: User, cars: Car[] }[];

// ...
}

und wird dann von mehreren verschiedenen Stellen in der Komponente aktualisiert. Das macht es schwierig, alle Dinge zu erfassen, die die clients betreffen könnten.

// Einige Ausschnitte des imperativen Codes, der die Clients beeinflusst.
// ...
export class AppComponent {

  clients?: { user: User, cars: Car[] }[];
// ...
refreshState() { this.dataService.getState().subscribe(state => { this.clients = state.users.map(user => ({ user, cars: [] }))); // ... }); } private assignCarIfPossible() { // ... this.clients?.forEach(client => { if (client.user.id === binding.userId) { const car = this.freeCars?.find(car => car.id === binding.carId); if (car) { client.cars.push(car); } } }); // ... } removeCarFromUser($event: MouseEvent, car: Car, user: User) { // ... this.clients?.forEach(client => { if (client.user.id === user.id) { client.cars = client.cars.filter(c => c.id !== car.id); this.freeCars?.push(car); } }) // ... } userSubmit($event: Event) { // ... this.clients[clientIndex] = { user: { id: clientId, firstName, lastName, email }, cars: this.clients[clientIndex].cars || [] } // ... this.clients?.push({ user: { id, firstName, lastName, email }, cars: [] }); // ... } userRemove($event: Event) { // ... this.clients = users.map(user => ({ user, cars: [] })); this.clients?.forEach(client => { if (!client.user.id) { return; } this.dataService.getCarsByUserId(client.user.id).subscribe(cars => { client.cars = cars; }); // ... } carRemove($event: Event) { // ... this.refreshState(); // ... } }

Wie könnten wir das besser machen? Indem wir nicht ein clients Feld haben, das ein Array ist. Stattdessen verwenden wir ein clients$ Observable eines clients Arrays, das so definiert ist, dass es direkt zeigt, woher sein Wert kommen kann:

// Alles, was Sie über das Clients Observable wissen müssen, finden Sie in seiner Initialisierung.

// ...
  readonly clients$ = merge(
    this.refresh$.pipe(
      startWith(undefined), // make this trigger immediately
      switchMap(() => this.dataService.getAllUsers().pipe(
        switchMap(users => forkJoin(users.map(user =>
          this.dataService.getCarsByUserId(user.id).pipe(
            map(cars => ({ user, cars }))
          )
        )))
      )),
      map(clients => (arg: unknown) => clients)
    ),
    this.carRemoved$.pipe(
      filter((carId): carId is string => typeof carId === 'string'), // filter errors
      map(carId => (clients: Client[]) => clients.map(client => ({
        ...client,
        cars: client.cars.filter(car => car.id !== carId)
      })))
    ),
    this.carUpdated$.pipe(
      filter((car): car is Car => typeof car !== 'number'), // filter errors
      map(car => (clients: Client[]) => clients.map(client => ({
        ...client,
        cars: client.cars.map(c => c.id === car.id ? car : c)
      })))
    ),
    this.userAdded$.pipe(
      filter((user): user is User => typeof user !== 'number'),
      map(user => (clients: Client[]) => [...clients, { user, cars: [] }])
    ),
    this.userUpdated$.pipe(
      filter((user): user is User => typeof user !== 'number'),
      map(user => (clients: Client[]) => clients.map(client => client.user.id === user.id ? { user, cars: client.cars } : client))
    ),
    this.userRemoved$.pipe(
      filter((userId): userId is string => typeof userId === 'string'),
      map(userId => (clients: Client[]) => clients.filter(client => client.user.id !== userId))
    ),
    this.carRemovedFromUser$.pipe(
      filter((arg): arg is { car: Auto, Benutzer: User } => typeof arg !== 'number'),
      map(({ car, user }) => (clients: Client[]) => clients.map(client => client.user.id === user.id ? {
        user: client.user,
        cars: client.cars.filter(c => c.id !== car.id)
      } : client))
    ),
    this.carAssignedToUser$.pipe(
      filter((arg): arg is { car: Auto, Benutzer: User } => typeof arg !== 'number'),
      map(({ car, user }) => (clients: Client[]) => clients.map(client => client.user.id === user.id ? {
        user: client.user,
        cars: [...client.cars, car]
      } : client))
    ),
  ).pipe(
    scan((clients, modifier) => modifier(clients), this.initialClients), // update state
    startWith(this.initialClients),
    shareReplay(1),
  );
 // ...

Okay, jetzt sehen Sie, was ich meine, wenn ich sage, dass Code, der RxJS verwendet, ganz anders aussieht. Das mag zunächst erschlagend sein, also lassen Sie es uns aufschlüsseln.

 

Der merge-Operator wird verwendet, um mehrere Observables zu einem einzigen zu kombinieren. Stellen Sie sich das so vor, als ob mehrere Fahrspuren auf einer Autobahn zu einer einzigen zusammengeführt werden. Immer wenn eines der Input Observables feuert, wird dieser Wert von dem resultierenden aggregierten Observable gefeuert. Wir können sofort sehen, was clients$ potentiell verändern kann. Es sind refresh$, carRemoved$, carUpdated$, userAdded$, userUpdated$, userRemoved$, carRemovedFromUser$, und carAssignedToUser$. Nichts anderes kann clients$ beeinflussen.

 

Natürlich haben alle diese Observables sehr unterschiedliche Typen - sie liefern sehr unterschiedliche Daten - die meisten von ihnen überhaupt keine Clients. Sie müssten sich diesen Code in der IDE Ihrer Wahl ansehen, dann werden Sie feststellen, dass die meisten von ihnen entweder einen von mehreren Fehlern oder Daten unterschiedlicher Art liefern. Wir sind (hier) nicht an Fehlern interessiert, also filtern wir sie heraus.

Der nächste Teil ist knifflig. Wir haben in der Tat eine kleine State-Machine aufgebaut, indem wir den scan-Operator in der letzten Pipe verwenden. scan ist für ein Observable das, was reduce für ein Array ist. Es erlaubt uns, einen akkumulierten Wert aus allen Eingabewerten zu berechnen, die bisher eingetroffen sind. Kombinieren Sie dies mit einer StateModifier Funktion:

type StateModifier<T> = (state: T) => T;

die wie ein Reducer in populären Zustandsverwaltungsbibliotheken wie NgRx agiert, und Sie haben eine Möglichkeit, den Zustand mit beliebigen Eingabeereignissen auf dem neuesten Stand zu halten.

 

Jedes Input Observable wird auf eine Funktion gemappt, die die Type Constraint StateModifier<Client[]> erfüllt. Diese Funktion aktualisiert den akkumulierten Zustand, indem sie die vom Input Observable empfangenen Daten verwendet, um das Zustandsobjekt zu aktualisieren. Der Zustand wird mit dem Operator startWith mit einem Anfangswert gestartet und mit shareReplay(1) geteilt und wiedergegeben. Auf diese Weise erhält jeder neue Abonnent sofort den letzten Wert, ohne auf ein Ereignis warten zu müssen, das ausgelöst wird.

Eine mentale Krücke

Bei RxJS-Code ist es oft hilfreich, die Logik vom Ende zum Anfang hin zu konstruieren. Überlegen Sie, was Sie am Ende erreichen wollen, schreiben Sie ein Observable dieses Datentyps, und sehen Sie dann, welche Ereignisse zur Bildung dieser Daten führen. Verwenden Sie die Kompositionsoperatoren von RxJS, insbesondere merge, combineLatest, withLatestFrom, forkJoin und andere, um die verschiedenen Daten-/Ereignisquellen so zusammenzufügen, dass das gewünschte Endergebnis entsteht. Auf diese Weise vermeiden Sie, Dinge zu bauen, die Sie am Ende nicht brauchen.