Die deutsche Version dieses Artikels finden Sie hier.
Even though NeuroForge uses Typescript mainly in the frontend, it is quite possible that an ETL process for a Data Warehouse (DW) will be implemented in Typescript. A data source for such a DW may e.g. a REST API (illustrated here on a Django REST Framework API for images):
Before we look at how such an API is processed in REST, let's first define the following types and functions:
interface Page { count: number; next: string; previous: string; // not really required for this example results: Image[]; } interface Image { url: string, id: string, image: string, meta: any } function parseImages(page: Page): Promise<Image[]> { // do actual parsing here, omitted for brevity // page has to be validated to be completely safe return Promise.resolve(page.results); } function queryPage(sessionInfo: SessionInfo, url: string): Promise<Page> { return query({ uri: url, cookieJar: sessionInfo.cookieJar, method: 'GET', headers: sessionInfo.requiredHeaders() }); }
If you consume our example API in the classic way in Typescript, it looks something like this:
login(BASE_URL, 'nf', 'nf').then(async session => { let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')); while(page) { // do stuff with page const parsed = await parseImages(page); for(const elem of parsed) { console.log(elem.url); } if(page.next && page.next != null) { page = await queryPage(session, page.next); } else { page = null; } } });
At first glance, this is still very clear, but what if we now want to query another page-based end point for each image?
login(BASE_URL, 'nf', 'nf').then(async session => { let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')); while(page) { // do stuff with page const parsed = await parseImages(page); for(const elem of parsed) { let subPage = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')); while(subPage) { // do stuff with page const parsedSub = await parseSubItem(page); for(const subElem of parsedSub) { console.log(elem, subElem); } if(subPage.next && subPage.next != null) { page = await queryPage(session, subPage.next); } else { page = null; } } } if(page.next && page.next != null) { page = await queryPage(session, page.next); } else { page = null; } } });
The problem here is: The actual business logic - what we want to express with our program - is buried in boilerplate code.
To make the code more readable, one might come up with this solution that moves the inner loop into a second function:
login(BASE_URL, 'nf', 'nf').then(async session => { let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')); while(page) { // do stuff with page const parsed = await parseImages(page); for(const elem of parsed) { handleSubListForElem(elem); } if(page.next && page.next != null) { page = await queryPage(session, page.next); } else { page = null; } } });
Such a solution, however only moves the problem into the subroutine itself (the ration business logic / boilerplate is still the same). We also have to be careful that while trying to make the code more readable we do not achieve the opposite - think: "spaghetti-code"
Looking at the above iteration code, we can notice that the iteration part always uses similar building blocks
With this in mind, it seems logical to abstract the iteration code away. Because of this, we at NeuroForge have developed a small library named RESTful-stream.ts. This library is based on the idea that we can put the operations 2-4 into a single interface Control<PageType, DataType>
:
interface Control<PageType, DataType> { hasNext: (page: PageType) => boolean; next: (page: PageType) => Promise<PageType>; parse: (page: PageType) => Promise<DataType[]>; }
hasNext
checks if a next page is available (3), next
jumps to the next page (and already requests it, 4) and parse
parses a given pgae into a list of data points (2). Getting the first page is not part of this interface as it is handled differently.
For our exemplary Image API the implementation would look like this:
const imageCtrl: Control<Page, Image> = { hasNext(page: Page): boolean { return page.next != null; }, next(page: Page): Promise<Page> { return queryPage(session, page.next); }, parse(page: Page): Promise<Image[]> { return parseImages(page); } };
Using this implementation in conjunction with parse
from RESTful-stream.ts, we can now define a parser for Image
objets as:
const images: Parser<Image> = parse(imageCtrl, queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')));
A Parser<DataType>
in RESTful-stream.ts is simply a function () => Promise<ListChunk<DataType>>
- a supplier function that asynchronously produces a chunk of the list (the first page of the API). ListChunk<T>
is here simply defined as:
interface ListChunk<T> { next: Parser<T> | null, elements: T[] }
This interface also allows to abstract the definition of how to get Image
s - a Parser<Image>
away from the iteration code. Since the parser itself does not run any REST request, yet, we can reuse it as often as we want to start new independent evaluations.
To iterate over the data in the parser, we can now already directly use it:
let cur: Parser<T> | null = () => Promise.resolve(start); while (cur) { const curAwaited: ListChunk<Image> = await cur(); for(const image of cur.elements) { console.log(element); } cur = curAwaited.next; }
We could now wrap this pattern in a function:
async function forEach<T>(parser: Parser<T>, action: (item: T) => Promise<any>): Promise<void> { let cur: Parser<T> | null = parser; while (cur) { const curAwaited: ListChunk<T> = await cur(); for(const elem of curAwaited.elements) { await action(elem); } cur = curAwaited.next; } }
Which would allow us to write quite a concise version of our iteration:
forEach(images, (image: Image) => { console.log(image.url); });
The problem with this approach is this though: A function map<T, U>(parser: Parser<T>, mapFn: (item: T) => Promise<U>): Promise<Parser<U>>
which should still have streaming properties (notice the return type - we do not want to load everything into memory) would be non trivial to implement as we would have to unwrap ListChunk<T>
from the Parser<T>
, and put the mapped elements back into a ListChunk<U>
mapped in a Parser<U>
.
Alternatively, if we use AsyncIterableIterator
s, we can define the following function:
async function* iterate<DataType>(parser: Parser<DataType>): AsyncIterableIterator<DataType> { let cur: Parser<DataType> | null = parser; while (cur) { const curAwaited: ListChunk<DataType> = await cur(); yield* curAwaited.elements; cur = curAwaited.next; } }
which would allow us to iterate over the elements as such:
for await(const image of iterate(images)) { console.log(image.url); }
Implementing a lazy map
function (with a different signature, we do not need the Parser interface for laziness) is now extremely easy to define:
async function* map<T, U>(input: AsyncIterableIterator<T>, mapFn: (item: T) => Promise<U>): AsyncIterableIterator<U> { for await(const elem of input) { yield mapFn(elem); } }
Note: iterate
and map are not 100% lazy, because both already run the evaluation to the point where they can produce the first element (even though we do not wait for this result!) as soon as they are invoked. If such a behaviour is required, it is easy to define a lazy variant based on a type LazyAsyncIterableIterator<T> = () => AsyncIterableIterator<T>
.
We have seen how to transform a REST API into a stream of elements by first abstracting iteration to then define a Parser interface which we then turned into a AsyncIterableIterator<T>
. These iterators allow us to treat huge amounts of data as streams (which we can even transform as such - see map
) without having to load the complete list of elements into memory.
The complete library can be installed via npm i restful-stream
, the source code can be found at https://github.com/neuroforgede/RESTful-stream.ts. If you have any feedback and remarks, feel free to contact us via e-mail at blog@neuroforge.de.
const rp = require('request-promise').defaults({ simple: false }); import * as tough from 'tough-cookie'; import * as _ from 'lodash'; import {iterate, parse, Control, Parser} from 'restful-stream'; const BASE_URL = 'https://localhost:6043/api/'; const DEBUG = true; if(DEBUG) { process.env["NODE_TLS_REJECT_UNAUTHORIZED"] = '0'; } function query(params: {uri: string, body?: any, formData?: any, json?: boolean, method: string, qs?: any, cookieJar: any, headers: any}) { var options = { uri: params.uri, jar: params.cookieJar, qs: params.qs, method: params.method, body: params.body ? params.body : undefined, formData: params.formData, headers: params.headers, json: params.json ? params.json : true, rejectUnauthorized: !DEBUG, insecure: DEBUG }; return rp(options); } function endpointURL(baseUrl, endpointPath) { return `${baseUrl}${endpointPath}`; } function loginURL(baseURL) { return endpointURL(baseURL, 'common/auth/login/'); } function findCookie(cookieJar, url, key) { const cookies = _.map(cookieJar.getCookies(url), cookie => { return cookie.toJSON(); }); return _.find(cookies, cookie => { return cookie.key === key; }); } interface SessionInfo { cookieJar: tough.CookieJar, requiredHeaders: any }; async function login(baseURL, user, password): Promise<SessionInfo> { const cookieStore = new tough.MemoryCookieStore(); const cookieJar = rp.jar(cookieStore); await query({ uri: loginURL(BASE_URL), cookieJar: cookieJar, method: 'GET', headers: {} }); const csrfToken = findCookie(cookieJar, baseURL, 'csrftoken').value; return query({ uri: loginURL(BASE_URL), cookieJar: cookieJar, method: 'POST', json: true, formData: { 'username': 'nf', 'password': 'nf' }, headers: { 'referer': BASE_URL, 'X-CSRFToken': csrfToken } }).then(() => { return { cookieJar: cookieJar, requiredHeaders: () => { const newCSRFToken = findCookie(cookieJar, baseURL, 'csrftoken').value; return { 'X-CSRFToken': newCSRFToken, 'referer': loginURL(baseURL) } } }; }); } interface Page { count: number; next: string; previous: string; // not really required for this example results: Image[]; } interface Image { url: string, id: string, image: string, meta: any } function parseImages(page: Page): Promise<Image[]> { // do actual parsing here, omitted for brevity // page has to be validated to be completely safe return Promise.resolve(page.results); } function queryPage(sessionInfo: SessionInfo, url: string): Promise<Page> { return query({ uri: url, cookieJar: sessionInfo.cookieJar, method: 'GET', headers: sessionInfo.requiredHeaders() }); } login(BASE_URL, 'nf', 'nf').then(async session => { let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')); while(page) { // do stuff with page const parsed = await parseImages(page); for(const elem of parsed) { console.log(elem.url); } if(page.next && page.next != null) { page = await queryPage(session, page.next); } else { page = null; } } const imageCtrl: Control<Page, Image> = { hasNext(page: Page): boolean { return page.next != null; }, next(page: Page): Promise<Page> { return queryPage(session, page.next); }, parse(page: Page): Promise<Image[]> { return parseImages(page); } }; const parser: Parser<Image> = parse(imageCtrl, queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'))); // request parsed definitions for await(const elem of iterate(parser)) { console.log(elem.url); } });