var fs = require('fs')
var es = require('event-stream')
var dayjs = require('dayjs')
var axios = require('axios')
var customParseFormat = require('dayjs/plugin/customParseFormat')
dayjs.extend(customParseFormat)
var data = ''
var lineCount = 0
var lineTotal = 0
function pad (n, width, z) {
z = z || '0'
n = n + ''
return n.length >= width ? n : new Array(width - n.length + 1).join(z) + n
}
function parseDate (content) {
return pad(content[1], 2) + '-' + pad(content[2], 2) + '-' + pad(content[3], 2) + ':' + pad(content[4], 2) + pad(content[5], 2)
}
function getLatLong (content) {
if (content[13] !== 'NA' && content[14] !== 'NA' && content[13] > 0 && content[14] !== '"0"') {
if(content[13]>parseInt(content[14].replace(/"/gi, ''))) {
return (parseInt(content[13]) / 100000) + ':' + (parseInt(content[14].replace(/"/gi, '')) / 100000)
} else {
return parseInt(content[14].replace(/"/gi, '')/ 100000) + ':' + (parseInt(content[13]) / 100000)
}
} else {
return ''
}
}
function send (payload) {
axios.post('http://localhost:8080/api/v0/update', payload, {
headers: {
'X-Warp10-Token': '<YOUR WRITE TOKEN>'
}
})
.then((res) => {
console.log(`statusCode: ${res.statusCode}`)
// console.log(res)
})
.catch((error) => {
console.error(error)
})
}
var s = fs.createReadStream('caracteristics.csv')
.pipe(es.split())
.pipe(es.mapSync(function (line) {
s.pause()
var content = line.split(',')
// resume the readstream, possibly from a callback
if (lineTotal > 0) {
var timestamp = dayjs(parseDate(content), 'YY-MM-JJ:hhmm').valueOf() * 1000
data += timestamp + '/' + getLatLong(content) + '/ accidents{col=' + content[9] + ',dep=' + content[15] + '} 1\n'
// console.log(data)
if (lineCount > 1000) {
send(data)
data = ''
lineCount = 0
}
}
lineCount++
lineTotal++
s.resume()
})
.on('error', function (err) {
console.log('Error while reading file.', err)
})
.on('end', function () {
console.log('Read entire file.')
})
)