Typescript: Wie man eine seitenbasierte REST API in einen Daten-Stream umwandelt

The english version of this blog entry can be found here.

Auch wenn bei NeuroForge Typescript hauptsächlich im Frontend verwendet wird, kann es durchaus sein dass mal ein ETL Prozess für ein Data Warehouse (DW) in Typescript implementiert wird. Eine Datenquelle für so ein DW kann z.B. eine REST API sein (Hier an einer Django REST Framework API für Bilder veranschaulicht):

Bevor wir uns ansehen, wie eine solche API in REST verarbeitet wird, definieren wir uns zuerst die folgenden Typen und Funktionen:

interface Page {
    count: number;
    next: string;
    previous: string; // not really required for this example
    results: Image[];
}

interface Image {
    url: string,
    id: string,
    image: string,
    meta: any
}

function parseImages(page: Page): Promise<Image[]> {
    // do actual parsing here, omitted for brevity
    // page has to be validated to be completely safe
    return Promise.resolve(page.results);
}

function queryPage(sessionInfo: SessionInfo, url: string): Promise<Page> {
    return query({
        uri: url,
        cookieJar: sessionInfo.cookieJar,
        method: 'GET',
        headers: sessionInfo.requiredHeaders()
    });
}

Klassische Herangehensweise & Problemstellung

Wenn man unsere Beispiel API nun in Typescript auf klassische Art und Weise konsumiert, sieht das in etwa so aus:

login(BASE_URL, 'nf', 'nf').then(async session => {
    let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
    while(page) {
        // do stuff with page
        const parsed = await parseImages(page);

        for(const elem of parsed) {
            console.log(elem.url);
        }

        if(page.next && page.next != null) {
            page = await queryPage(session, page.next);
        } else {
            page = null;
        }
    }
});

Das ist jetzt auf den ersten Blick noch sehr übersichtlich, aber was ist, wenn wir jetzt für jedes Bild einen weiteren seitenbasierten Endpunkt abfragen wollen?

login(BASE_URL, 'nf', 'nf').then(async session => {
    let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
    while(page) {
        // do stuff with page
        const parsed = await parseImages(page);

        for(const elem of parsed) {
            let subPage = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
            while(subPage) {
                // do stuff with page
                const parsedSub = await parseSubItem(page);

                for(const subElem of parsedSub) {
                    console.log(elem, subElem);
                }

                if(subPage.next && subPage.next != null) {
                    page = await queryPage(session, subPage.next);
                } else {
                    page = null;
                }
            }
        }

        if(page.next && page.next != null) {
            page = await queryPage(session, page.next);
        } else {
            page = null;
        }
    }
});

Das Problem mit diesem Code ist: Die eigentliche Business Logik, also das was wir mit dem Programm eigentlich tun, geht unter in Boilerplate Code.

Um den Code übersichtlicher zu gestalten könnte man jetzt natürlich die innere Schleife auslagern,

login(BASE_URL, 'nf', 'nf').then(async session => {
    let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
    while(page) {
        // do stuff with page
        const parsed = await parseImages(page);

        for(const elem of parsed) {
            handleSubListForElem(elem);
        }

        if(page.next && page.next != null) {
            page = await queryPage(session, page.next);
        } else {
            page = null;
        }
    }
});

jedoch verlagert sie das Problem nur in die Subroutine (das Verhältnis Business Logik / Boilerplate ist dasselbe). Zudem muss man auch aufpassen, dass beim Versuch den Code übersichtlicher zu machen nicht das Gegenteil tut – Stichwort “Spaghetticode”.

Alternative: Problemlösung mit iterierbaren Streams

Wenn man sich den obigen Iterationscode ansieht, fällt auf, dass die Iteration immer auf den selben Operationen aufbaut:

  1. Abfrage der ersten Seite
  2. Parsen der aktuellen Seite
  3. Überprüfung, ob eine nächste Seite vorhanden ist
  4. Weiterspringen auf die nächste Seite

Es scheint also möglich, den Iterationscode zu abstrahieren. Daher haben wir bei NeuroForge eine kleine Library namens RESTful-stream.ts entwickelt. Diese basiert auf der Überlegung, dass man Operationen 2-4 in ein Interface Control<PageType, DataType> kapseln kann:

interface Control<PageType, DataType> {
    hasNext: (page: PageType) => boolean;
    next: (page: PageType) => Promise<PageType>;
    parse: (page: PageType) => Promise<DataType[]>;
}

hasNext überprüft, ob eine weitere Seite verfügbar ist (3), next springt zur nächsten Seite (und fragt diese gleich ab, 4) und parse verarbeitet eine gegebene Seite in eine Liste von Datenpunkten (2). Das Abfragen der ersten Seite ist nicht Teil des Interfaces, sondern wird anderweitig bewerkstelligt.

Für unsere Beispiel Image API sähe eine Implementation dieses Interfaces wie folgt aus:

const imageCtrl: Control<Page, Image> = {
    hasNext(page: Page): boolean {
        return page.next != null;
    },
    next(page: Page): Promise<Page> {
        return queryPage(session, page.next);
    },
    parse(page: Page): Promise<Image[]> {
        return parseImages(page);
    }
};

Nimmt man diese Implementation und die Methode parse aus RESTful-stream.ts, kann man nun einen Parser für unsere Image Objekte definieren:

const images: Parser<Image> = parse(imageCtrl, queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')));

Ein Parser<DataType> in RESTful-stream.ts ist nichts anderes als eine Funktion () => Promise<ListChunk<DataType>>, also eine Supplier Funktion, die asynchron einen Chunk (Teil) der einer Liste (also zu Beginn die erste Seite unserer API) zurückliefert. ListChunk<T> ist hier definiert als

interface ListChunk<T> {
    next: Parser<T> | null,
    elements: T[]
}

Diese Interfaces erlauben es uns weiterhin die Definition wie wir an die Daten kommen wegzuabstrahieren, ohne eine REST Operation direkt ausgeführt zu haben. Man kann den Parser also immer wieder verwenden, und er startet jedes mal eine neue unabhängige Evaluationskette, wenn er ausgewertet wird.

Um an die Daten des Parsers zu gelangen, könnten wir den Parser nun direkt verwenden:

let cur: Parser<T> | null = () => Promise.resolve(start);
while (cur) {
    const curAwaited: ListChunk<Image> = await cur();
    
    for(const image of cur.elements) {
        console.log(element);
    }

    cur = curAwaited.next;
}

Dieses Pattern könnte man nun in eine Funktion wrappen

async function forEach<T>(parser: Parser<T>, action: (item: T) => Promise<any>): Promise<void> {
    let cur: Parser<T> | null = parser;
    while (cur) {
        const curAwaited: ListChunk<T> = await cur();
        
        for(const elem of curAwaited.elements) {
            await action(elem);
        }
        
        cur = curAwaited.next;
    }
}

und dann ist der Iterationscode schon um einiges kürzer:

forEach(images, (image: Image) => {
    console.log(image.url);
});

Eine map<T, U>(parser: Parser<T>, mapFn: (item: T) => Promise<U>): Promise<Parser<U>> Funktion, welche weiterhin ein streamendes Verhalten haben sollte, wäre aber trotzdem nur mit größerem Aufwand zu definieren, da der ListChunk<T> Typ aus dem Parser<T> entpackt werden müsste, die Elemente gemappt und diese wieder zurück in ein ListChunk<U> in Parser<U> verpackt werden müssten.

Wenn wir aber nun von AsyncIterableIteratoren Gebrauch machen, können wir die folgende Funktion defnieren:

async function* iterate<DataType>(parser: Parser<DataType>): AsyncIterableIterator<DataType> {
    let cur: Parser<DataType> | null = parser;
    while (cur) {
        const curAwaited: ListChunk<DataType> = await cur();
        yield* curAwaited.elements;
        cur = curAwaited.next;
    }
}

Das erlaubt uns nun, wie folgt über unsere Elemente zu iterieren:

for await(const image of iterate(images)) {
    console.log(image.url);
}

Eine lazy map Methode (mit einer etwas anderen Signatur, wir benötigen das Parser interface nicht mehr für Laziness) lässt sich nun sehr einfach definieren:

async function* map<T, U>(input: AsyncIterableIterator<T>, mapFn: (item: T) => Promise<U>): AsyncIterableIterator<U> {
    for await(const elem of input) {
        yield mapFn(elem);
    }
}

Dem aufmerksamen Leser ist hier sicherlich aufgefallen, dass iterate und map nicht 100% lazy sind, da beide direkt mit der Evaluation beginnen und soweit laufen, bis sie das erste Ergebnis produzieren können (auch wenn dabei nicht auf sie gewartet wird), sobald sie aufgerufen wurden. Wenn ein solches Verhalten gewünscht ist, sind komplett lazy funktionierende Varianten welche auf type LazyAsyncIterableIterator<T> = () => AsyncIterableIterator<T> mit wenig Aufwand zu definieren.

Zusammenfassung

Wir haben gesehen, wie man eine seitenbasierte REST API in einen Stream von Einzel-Elementen umwandeln kann indem wir zuerst die Iteration abstrahiert haben, um ein Parser interface zu definieren, welches wir anschließend in einen AsyncIterableIterator<T> umgewandelt haben. Solche Iteratoren lassen es zu, große Datenmengen wie Streams zu behandeln (welche auch als solche weiterverarbeitet werden können – siehe map) ohne dabei die komplette Liste an Elementen auf einmal aus der REST API zu ziehen.

Die vollständige Library kann mittels npm i restful-stream installiert werden, den Quellcode finden Sie auf https://github.com/neuroforgede/RESTful-stream.ts. Wir freuen uns immer über Feedback und Anmerkungen per Mail an blog@neuroforge.de.

Vollständiges Code Beispiel

const rp = require('request-promise').defaults({ simple: false });
import * as tough from 'tough-cookie';
import * as _ from 'lodash';
import {iterate, parse, Control, Parser} from 'restful-stream';

const BASE_URL = 'https://localhost:6043/api/';
const DEBUG = true;

if(DEBUG) {
    process.env["NODE_TLS_REJECT_UNAUTHORIZED"] = '0';
}

function query(params: {uri: string, body?: any, formData?: any, json?: boolean, method: string, qs?: any, cookieJar: any, headers: any}) {
    var options = {
        uri: params.uri,
        jar: params.cookieJar,
        qs: params.qs,
        method: params.method,
        body: params.body ? params.body : undefined,
        formData: params.formData,
        headers: params.headers,        
        json: params.json ? params.json : true,
        rejectUnauthorized: !DEBUG,
        insecure: DEBUG
    };
    return rp(options);
}


function endpointURL(baseUrl, endpointPath) {
    return `${baseUrl}${endpointPath}`;
}

function loginURL(baseURL) {
    return endpointURL(baseURL, 'common/auth/login/');
}

function findCookie(cookieJar, url, key) {
    const cookies = _.map(cookieJar.getCookies(url), cookie => {
        return cookie.toJSON();
    });
    return _.find(cookies, cookie => {
        return cookie.key === key;
    });
}

interface SessionInfo {
    cookieJar: tough.CookieJar,
    requiredHeaders: any
};

async function login(baseURL, user, password): Promise<SessionInfo> {
    const cookieStore = new tough.MemoryCookieStore();
    const cookieJar = rp.jar(cookieStore);

    await query({
        uri: loginURL(BASE_URL),
        cookieJar: cookieJar,
        method: 'GET',
        headers: {}
    });
    const csrfToken = findCookie(cookieJar, baseURL, 'csrftoken').value;

    return query({
        uri: loginURL(BASE_URL),
        cookieJar: cookieJar,
        method: 'POST',
        json: true,
        formData: {
            'username': 'nf',
            'password': 'nf'
        },
        headers: {
            'referer': BASE_URL,
            'X-CSRFToken': csrfToken
        }
    }).then(() => {
        return {
            cookieJar: cookieJar,
            requiredHeaders: () => {
                const newCSRFToken = findCookie(cookieJar, baseURL, 'csrftoken').value;
                return {
                    'X-CSRFToken': newCSRFToken,
                    'referer': loginURL(baseURL)
                }
            }
        };
    });
}

interface Page {
    count: number;
    next: string;
    previous: string; // not really required for this example
    results: Image[];
}

interface Image {
    url: string,
    id: string,
    image: string,
    meta: any
}

function parseImages(page: Page): Promise<Image[]> {
    // do actual parsing here, omitted for brevity
    // page has to be validated to be completely safe
    return Promise.resolve(page.results);
}

function queryPage(sessionInfo: SessionInfo, url: string): Promise<Page> {
    return query({
        uri: url,
        cookieJar: sessionInfo.cookieJar,
        method: 'GET',
        headers: sessionInfo.requiredHeaders()
    });
}

login(BASE_URL, 'nf', 'nf').then(async session => {
    let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
    while(page) {
        // do stuff with page
        const parsed = await parseImages(page);

        for(const elem of parsed) {
            console.log(elem.url);
        }

        if(page.next && page.next != null) {
            page = await queryPage(session, page.next);
        } else {
            page = null;
        }
    }

    const imageCtrl: Control<Page, Image> = {
        hasNext(page: Page): boolean {
            return page.next != null;
        },
        next(page: Page): Promise<Page> {
            return queryPage(session, page.next);
        },
        parse(page: Page): Promise<Image[]> {
            return parseImages(page);
        }
    };

    const parser: Parser<Image> = parse(imageCtrl, queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')));

    // request parsed definitions
    for await(const elem of iterate(parser)) {
        console.log(elem.url);
    }
});