Even though NeuroForge mainly uses Typescript in the frontend, it can happen that an ETL process for a data warehouse (DW) is implemented in Typescript. A data source for such a DW can be, for example, a REST API (illustrated here with a Django REST Framework API for images):
Before we look at how such an API is processed in REST, we first define the following types and functions:
interface Page {
count: number;
next: string;
previous: string; // not really required for this example
results: Image[];
}
interface Image {
url: string,
id: string,
image: string,
meta: any
}
function parseImages(page: Page): Promise<Image[]> {
// do actual parsing here, omitted for brevity
// page has to be validated to be completely safe
return Promise.resolve(page.results);
}
function queryPage(sessionInfo: SessionInfo, url: string): Promise<Page> {
return query({
uri: url,
cookieJar: sessionInfo.cookieJar,
method: 'GET',
headers: sessionInfo.requiredHeaders()
});
}
Classic approach & problem definition
If you now consume our example API in Typescript in the classic way, it looks something like this:
login(BASE_URL, 'nf', 'nf').then(async session => {
let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
while(page) {
// do stuff with page
const parsed = await parseImages(page);
for(const elem of parsed) {
console.log(elem.url);
}
if(page.next && page.next != null) {
page = await queryPage(session, page.next);
} else {
page = null;
}
}
});
Now at first glance this is still very clear, but what if we now want to query another page-based endpoint for each image?
login(BASE_URL, 'nf', 'nf').then(async session => {
let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
while(page) {
// do stuff with page
const parsed = await parseImages(page);
for(const elem of parsed) {
let subPage = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
while(subPage) {
// do stuff with page
const parsedSub = await parseSubItem(page);
for(const subElem of parsedSub) {
console.log(elem, subElem);
}
if(subPage.next && subPage.next != null) {
page = await queryPage(session, subPage.next);
} else {
page = null;
}
}
}
if(page.next && page.next != null) {
page = await queryPage(session, page.next);
} else {
page = null;
}
}
});
The problem with this code is that the actual business logic, i.e. what we actually do with the programme, is lost in boilerplate code.
To make the code clearer, one could of course outsource the inner loop,
login(BASE_URL, 'nf', 'nf').then(async session => {
let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
while(page) {
// do stuff with page
const parsed = await parseImages(page);
for(const elem of parsed) {
handleSubListForElem(elem);
}
if(page.next && page.next != null) {
page = await queryPage(session, page.next);
} else {
page = null;
}
}
});
but it only shifts the problem to the subroutine (the business logic / boilerplate ratio is the same). In addition, one must also be careful that the attempt to make the code clearer does not do the opposite - keyword "spaghetti code".
Alternative: Problem solving with iterable streams
If you look at the iteration code above, you will notice that the iteration is always built on the same operations:
Querying the first page
Parsing the current page
Checking whether there is a next page
Jump to the next page
So it seems possible to abstract the iteration code. So at NeuroForge we have developed a small library called RESTful-stream.ts. This is based on the idea that you can encapsulate operations 2-4 in an interface Control<PageType, DataType>:
interface Control<PageType, DataType> {
hasNext: (page: PageType) => boolean;
next: (page: PageType) => Promise<PageType>;
parse: (page: PageType) => Promise<DataType[]>;
}
hasNext checks whether another page is available (3), next jumps to the next page (and queries it immediately, 4) and parse processes a given page into a list of data points (2). Querying the first page is not part of the interface, but is done elsewhere.
For our example image API, an implementation of this interface would look like this:
const imageCtrl: Control<Page, Image> = {
hasNext(page: Page): boolean {
return page.next != null;
},
next(page: Page): Promise<Page> {
return queryPage(session, page.next);
},
parse(page: Page): Promise<Image[]> {
return parseImages(page);
}
};
Taking this implementation and the method parse from RESTful-stream.ts, we can now define a parser for our image objects:
const images: Parser<Image> = parse(imageCtrl, queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')));
A parser<DataType> in RESTful-stream.ts is nothing else than a function () => Promise<ListChunk<DataType>>, i.e. a supplier function that asynchronously returns a chunk (part) of a list (i.e. the first page of our API at the beginning). ListChunk<T> is defined here as
interface ListChunk<T> {
next: Parser<T> | null,
elements: T[]
}
These interfaces also allow us to abstract away the definition of how we get the data without having to perform a REST operation directly. So you can use the parser again and again, and it starts a new independent evaluation chain every time it is evaluated.
To get the data from the parser, we could now use the parser directly:
let cur: Parser<T> | null = () => Promise.resolve(start);
while (cur) {
const curAwaited: ListChunk<Image> = await cur();
for(const image of cur.elements) {
console.log(element);
}
cur = curAwaited.next;
}
This pattern could now be wrapped in a function
async function forEach<T>(parser: Parser<T>, action: (item: T) => Promise<any>): Promise<void> {
let cur: Parser<T> | null = parser;
while (cur) {
const curAwaited: ListChunk<T> = await cur();
for(const elem of curAwaited.elements) {
await action(elem);
}
cur = curAwaited.next;
}
}
and then the iteration code is already a lot shorter:
forEach(images, (image: Image) => {
console.log(image.url);
});
A map<T, U>(parser: Parser<T>, mapFn: (item: T) => Promise<U>): Promise<Parser<U>> function, which should still have streaming behaviour, would still only be possible to define with greater effort, since the ListChunk<T> type would have to be unpacked from the Parser<T>, the items mapped and these packed back into a ListChunk<U> in Parser<U>.
However, if we now make use of AsyncIterableIterators, we can define the following function:
async function* iterate<DataType>(parser: Parser<DataType>): AsyncIterableIterator<DataType> {
let cur: Parser<DataType> | null = parser;
while (cur) {
const curAwaited: ListChunk<DataType> = await cur();
yield* curAwaited.elements;
cur = curAwaited.next;
}
}
This now allows us to iterate over our elements as follows:
for await(const image of iterate(images)) {
console.log(image.url);
}
A lazy map method (with a slightly different signature, we no longer need the parser interface for laziness) can now be defined very easily:
async function* map<T, U>(input: AsyncIterableIterator<T>, mapFn: (item: T) => Promise<U>): AsyncIterableIterator<U> {
for await(const elem of input) {
yield mapFn(elem);
}
}
The attentive reader may have noticed that iterate and map are not 100% lazy, as both start evaluation immediately and run until they can produce the first result (even if not waited for) once they are called. If such behavior is desired, completely lazy functioning variants can be defined with little effort using type LazyAsyncIterableIterator<T> = () => AsyncIterableIterator<T>.
Summary
We have seen how to transform a paginated REST API into a stream of individual elements by first abstracting the iteration to define a parser interface, which we then converted into an AsyncIterableIterator<T>. Such iterators allow us to handle large amounts of data like streams (which can also be further processed as such - see map) without fetching the complete list of elements from the REST API at once.
The complete library can be installed using npm i restful-stream, and the source code can be found at https://github.com/neuroforgede/RESTful-stream.ts. We always appreciate feedback and comments via email at blog@neuroforge.de.
Complete code example:
const rp = require('request-promise').defaults({ simple: false });
import * as tough from 'tough-cookie';
import * as _ from 'lodash';
import {iterate, parse, Control, Parser} from 'restful-stream';
const BASE_URL = 'https://localhost:6043/api/';
const DEBUG = true;
if(DEBUG) {
process.env["NODE_TLS_REJECT_UNAUTHORIZED"] = '0';
}
function query(params: {uri: string, body?: any, formData?: any, json?: boolean, method: string, qs?: any, cookieJar: any, headers: any}) {
var options = {
uri: params.uri,
jar: params.cookieJar,
qs: params.qs,
method: params.method,
body: params.body ? params.body : undefined,
formData: params.formData,
headers: params.headers,
json: params.json ? params.json : true,
rejectUnauthorized: !DEBUG,
insecure: DEBUG
};
return rp(options);
}
function endpointURL(baseUrl, endpointPath) {
return `${baseUrl}${endpointPath}`;
}
function loginURL(baseURL) {
return endpointURL(baseURL, 'common/auth/login/');
}
function findCookie(cookieJar, url, key) {
const cookies = _.map(cookieJar.getCookies(url), cookie => {
return cookie.toJSON();
});
return _.find(cookies, cookie => {
return cookie.key === key;
});
}
interface SessionInfo {
cookieJar: tough.CookieJar,
requiredHeaders: any
};
async function login(baseURL, user, password): Promise<SessionInfo> {
const cookieStore = new tough.MemoryCookieStore();
const cookieJar = rp.jar(cookieStore);
await query({
uri: loginURL(BASE_URL),
cookieJar: cookieJar,
method: 'GET',
headers: {}
});
const csrfToken = findCookie(cookieJar, baseURL, 'csrftoken').value;
return query({
uri: loginURL(BASE_URL),
cookieJar: cookieJar,
method: 'POST',
json: true,
formData: {
'username': 'nf',
'password': 'nf'
},
headers: {
'referer': BASE_URL,
'X-CSRFToken': csrfToken
}
}).then(() => {
return {
cookieJar: cookieJar,
requiredHeaders: () => {
const newCSRFToken = findCookie(cookieJar, baseURL, 'csrftoken').value;
return {
'X-CSRFToken': newCSRFToken,
'referer': loginURL(baseURL)
}
}
};
});
}
interface Page {
count: number;
next: string;
previous: string; // not really required for this example
results: Image[];
}
interface Image {
url: string,
id: string,
image: string,
meta: any
}
function parseImages(page: Page): Promise<Image[]> {
// do actual parsing here, omitted for brevity
// page has to be validated to be completely safe
return Promise.resolve(page.results);
}
function queryPage(sessionInfo: SessionInfo, url: string): Promise<Page> {
return query({
uri: url,
cookieJar: sessionInfo.cookieJar,
method: 'GET',
headers: sessionInfo.requiredHeaders()
});
}
login(BASE_URL, 'nf', 'nf').then(async session => {
let page = await queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/'));
while(page) {
// do stuff with page
const parsed = await parseImages(page);
for(const elem of parsed) {
console.log(elem.url);
}
if(page.next && page.next != null) {
page = await queryPage(session, page.next);
} else {
page = null;
}
}
const imageCtrl: Control<Page, Image> = {
hasNext(page: Page): boolean {
return page.next != null;
},
next(page: Page): Promise<Page> {
return queryPage(session, page.next);
},
parse(page: Page): Promise<Image[]> {
return parseImages(page);
}
};
const parser: Parser<Image> = parse(imageCtrl, queryPage(session, endpointURL(BASE_URL, 'image-api/public/data/image/')));
// request parsed definitions
for await(const elem of iterate(parser)) {
console.log(elem.url);
}
});