Skip to content
Snippets Groups Projects
Commit 6f024f06 authored by Sven Weber's avatar Sven Weber
Browse files

Splitting logs into local and global storage

parent beac9f37
No related branches found
No related tags found
No related merge requests found
......@@ -394,9 +394,9 @@ class FLApiClient extends EventEmitter
* @param {*} stream a stream containing the logs (tar archive)
* @returns A Promise that resolves when the push was successful and fails otherwise
*/
async PushLogs(stream)
async PushGlobalStorage(stream)
{
return this.#post(`federatedjobinfo/${process.env.STATION_ID}/${this.#jobId}/logs`, stream, 'application/x-tar', true);
return this.#post(`federatedjobinfo/${process.env.STATION_ID}/${this.#jobId}/storage`, stream, 'application/x-tar', true);
}
/**
......
......@@ -11,8 +11,10 @@
module.exports = {
FLModelPath: "/federated/model",
FLModelPathEnvName: "FEDERATED_MODEL_PATH",
FLLogsPath: "/federated/logs",
FLLogsPathEnvName: "FEDERATED_LOGS_PATH",
FLModelPathEnvName: "FEDERATED_MODEL_PATH",
FLGlobalStorageEnvName: "FEDERATED_GLOBAL_STORAGE",
FLGlobalStoragePath: "/federated/global_storage",
FLLocalStorageEnvName: "FEDERATED_LOCAL_STORAGE",
FLLocalStoragePath: "/federated/local_storage",
FLStatusUpdates,
}
\ No newline at end of file
const { FLModelPath, FLModelPathEnvName, FLLogsPath, FLLogsPathEnvName} = require('./flConstants.js');
const {
FLModelPath, FLModelPathEnvName,
FLGlobalStorageEnvName, FLGlobalStoragePath,
FLLocalStorageEnvName, FLLocalStoragePath } = require('./flConstants.js');
const { FLJob, FLJobState } = require('../models/FLJob');
const FederatedLogger = require('./federatedLogger.js');
const { EventType } = require('./flEventType.js');
......@@ -240,13 +243,24 @@ class FLJobWorker extends EventEmitter
* @param {*} jobInfo jobInfo object for the current FL job
* @param {*} container the container for this round
*/
async #restoreLogFilesIfNeeded(self, jobInfo, container)
async #restoreStorageFilesIfNeeded(self, jobInfo, container)
{
if (jobInfo.lastLogGridId)
if (jobInfo.lastLocalStorageGridId || jobInfo.lastGlobalStorageGridId)
{
self.#log("Putting existing log files into container", false);
let logFilesStream = await GridFsClient.readFromGridFs(jobInfo.lastLogGridId);
await container.putArchive(logFilesStream, { path: FLLogsPath });
self.#log("Putting existing storage files into container", false);
if (jobInfo.lastLocalStorageGridId)
{
let localStorage = await GridFsClient.readFromGridFs(jobInfo.lastLocalStorageGridId);
await container.putArchive(localStorage, { path: FLLocalStoragePath });
}
if (jobInfo.lastGlobalStorageGridId)
{
let globalStorage = await GridFsClient.readFromGridFs(jobInfo.lastGlobalStorageGridId);
await container.putArchive(globalStorage, { path: FLGlobalStoragePath });
}
self.#log("Existing log files successfully put into container", false);
}
}
......@@ -270,23 +284,27 @@ class FLJobWorker extends EventEmitter
//Set the env variables variable to the correct values
let jobInfo = await self.#loadJobInfo();
let envs = jobInfo.envs.concat([`${FLModelPathEnvName}=${FLModelPath}`,
`${FLLogsPathEnvName}=${FLLogsPath}`]);
let envs = jobInfo.envs.concat([
`${FLModelPathEnvName}=${FLModelPath}`,
`${FLGlobalStorageEnvName}=${FLGlobalStoragePath}`,
`${FLLocalStorageEnvName}=${FLLocalStoragePath}`
]);
let container = await dockerUtil.createContainerFromImage(
jobInfo.trainstoragelocation,
self.#getContainerName(self),
envs
);
//Ensure the FLDataPath and FLLogsPath exists
//Ensure the all needed paths exist
await dockerUtil.ensurePathExistsInContainer(container, FLModelPath);
await dockerUtil.ensurePathExistsInContainer(container, FLLogsPath);
await dockerUtil.ensurePathExistsInContainer(container, FLGlobalStoragePath);
await dockerUtil.ensurePathExistsInContainer(container, FLLocalStoragePath);
//Update model when needed
await self.#updateContainerModel(self, modelStream, container);
//Restore Logs of they exist
await self.#restoreLogFilesIfNeeded(self, jobInfo, container);
await self.#restoreStorageFilesIfNeeded(self, jobInfo, container);
self.#log("Container created successfully", true);
res(container);
......@@ -368,124 +386,86 @@ class FLJobWorker extends EventEmitter
}
/**
* @param {*} jobInfo JobInfo object
* @param {JobWorker} self A refrence to this class
* @returns the filename in the gridFS that should be used for the result model
*/
#getGridFsModelFileName(jobInfo)
#getGridFsModelFileName(self)
{
return `${jobInfo.jobid}_model`;
return `${self.#jobId}_model`;
}
/**
* @param {*} jobInfo JobInfo object
* @returns the filename in the gridFS that should be used for the result logs
* @param {JobWorker} self A refrence to this class
* @returns the filename in the gridFS that should be used for the global storage
*/
#getGridFsLogsFileName(jobInfo)
#getGridFsGlobalStorageFileName(self)
{
return `${jobInfo.jobid}_logs`;
return `${self.#jobId}_global_storage`;
}
async #updateGridFsFromStream(self, existingId, filename, stream)
{
try
{
self.#log(`Putting file ${filename} into the gridFS`, false);
//Remove if old id exists, then store new stream
await GridFsClient.removeFromGridFsIfDefiend(existingId)
let id = await GridFsClient.storeInGridFs(filename, stream);
self.#log(`Putting file ${filename} into the gridFS success with new id ${id}`, false);
return id;
} catch (err)
{
self.#log(`Something went wrong while storing the model in gridFS: ${err}`, true)
throw new Error(`Learning failed because something went wrong during gridFs storage for the model`);
}
}
/**
* Stores the given logStream in the gridFs
* @param {*} self
* @param {*} extractedLogsStream
* @returns {*} an readable stream from the gridFS with the stored file
/**
* @param {JobWorker} self A refrence to this class
* @returns the filename in the gridFS that should be used for the local storage
*/
async #storeLogsInGridFS(self, extractedLogsStream)
#getGridFsLocalStorageFileName(self)
{
try
{
let jobInfo = await self.#loadJobInfo();
self.#log(`Putting logs into gridFs`, false);
let logsId = await self.#updateGridFsFromStream(self, jobInfo.lastLogGridId, self.#getGridFsLogsFileName(jobInfo),
extractedLogsStream);
//Store the new ids in the db
await self.#updateJobInfo(self, (info) => {
info.lastLogGridId = logsId;
});
self.#log(`Putting logs into gridFs success`, false);
} catch (err)
{
self.#log(`Something went wrong while storing the logs in gridFS: ${err}`, true)
throw new Error(`Learning failed because something went wrong during gridFs storage of the job logs`);
}
return `${self.#jobId}_local_storage`;
}
/**
* Stores the provided modelStream in the gridFs
* Stores the given stream in the gridFs with the provided filename and updates the given jobInfo property with the new id
* @param {*} self
* @param {*} extractedModelStream
* @param {*} extractedLogsStream
* @returns {*} an readable stream from the gridFS with the stored file
*/
async #storeModelInGridFS(self, extractedModelStream)
async #storeStreamInGridFSAndUpdateJobInfo(self, extractedStream, filename, infoProperty)
{
try
{
try {
let jobInfo = await self.#loadJobInfo();
self.#log(`Putting model into gridFs`, false);
let resultId = await self.#updateGridFsFromStream(self, jobInfo.lastResultGridId, self.#getGridFsModelFileName(jobInfo),
extractedModelStream);
self.#log(`Putting ${filename} into gridFs from stream`, false);
//Remove if old id exists, then store new stream
await GridFsClient.removeFromGridFsIfDefiend(jobInfo[infoProperty])
let id = await GridFsClient.storeInGridFs(filename, extractedStream);
//Store the new ids in the db
await self.#updateJobInfo(self, (info) => {
info.lastResultGridId = resultId;
info[infoProperty] = id;
});
self.#log(`Putting model into gridFs success`, false);
self.#log(`Putting ${filename} into gridFs from stream success`, false);
} catch (err)
{
self.#log(`Something went wrong while storing the model in gridFS: ${err}`, true)
throw new Error(`Learning failed because something went wrong during gridFs storage of the job model`);
self.#log(`Something went wrong while storing ${filename} in gridFS: ${err}`, true)
throw new Error(`Learning failed because something went wrong during gridFs storage one of the job results`);
}
}
}
/**
* Pushes the model and logs as the learning results of this round to the CS
* Pushes the model and global storage as the learning results of this round to the CS
* @param {*} self Instance to this class (selfreference)
*/
async #pushLogsAndModelToCs(self)
async #pushGlobalStorageAndModelToCs(self)
{
//Load the Job
let jobInfo = await self.#loadJobInfo();
//Push logs first since a model might directly lead to aggregation (if this is the last model)
//Push global storage first since a model might directly lead to aggregation (if this is the last model)
//And then the jobs are potentially missing in the output
//Important: pushing the logs is not mission critical, therefore the job does
//not fail when the logs cannot be pushed
//Important: pushing the global storage is not mission critical, therefore the job does
//not fail when the global storage cannot be pushed
//silent failure in this case
//-> Since logs are restored every round, the next round (if any) will retry sending them
//-> Since global storage are restored every round, the next round (if any) will retry sending them
try {
//Push Logs to CS
self.#log("Pushing Logs... ", true);
let logsStream = await GridFsClient.readFromGridFs(jobInfo.lastLogGridId);
await self.#apiClient.PushLogs(logsStream);
self.#log("Pushing Global Storage... ", true);
let storageStream = await GridFsClient.readFromGridFs(jobInfo.lastGlobalStorageGridId);
await self.#apiClient.PushGlobalStorage(storageStream);
self.#log("done.", true);
} catch (err)
{
self.#log(`Something went wrong while pushing the logs to the CS: ${err}`, true)
self.#log(`Something went wrong while pushing the global storage to the CS: ${err}`, true)
}
//Push Model to CS
......@@ -527,6 +507,35 @@ class FLJobWorker extends EventEmitter
}
}
/**
* Extracts the results, stores them in gridFS and updates the JobInfo accordingly
* @param {*} self
* @param {*} container
*/
async #extractResults(self, container)
{
//Extract, store in gridFS
//Important: This needs to be done after one another, you cannot read all archives
//and then store in gridFS (you need to read, store, read, store)
//The reason for this is that the docker API waits for the first request for finish before
//executing the second request after a certain file size is reached. This then results in the
//second request pending forever and the station to not progress at all...
let extractedModelStream = await dockerUtil.extractArchive(container, FLModelPath);
await self.#storeStreamInGridFSAndUpdateJobInfo(
self, extractedModelStream,
self.#getGridFsModelFileName(self), 'lastResultGridId'
)
let extractedGlobalStorage = await dockerUtil.extractArchive(container, FLGlobalStoragePath);
await self.#storeStreamInGridFSAndUpdateJobInfo(
self, extractedGlobalStorage,
self.#getGridFsGlobalStorageFileName(self), 'lastGlobalStorageGridId'
);
let extractedLocalStorage = await dockerUtil.extractArchive(container, FLLocalStoragePath);
await self.#storeStreamInGridFSAndUpdateJobInfo(self, extractedLocalStorage,
self.#getGridFsLocalStorageFileName(self), 'lastLocalStorageGridId'
);
}
/**
* Runs one FL round
* @param {*} self
......@@ -554,21 +563,12 @@ class FLJobWorker extends EventEmitter
await self.#throwErrorWhenCancled(self);
self.#log("Extracting results", true);
await self.#updateJobInfoState(self, FLJobState.PUSHING_RESULTS);
//Extract, store in gridFS
//Important: This needs to be done after one another, you cannot read both archives
//and then store in gridFS (you need to read, store, read, store)
//The reason for this is that the docker API waits for the first request for finish before
//executing the second request after a certain file size is reached. This then results in the
//second request pending forever and the station to not progress at all...
let extractedModelStream = await dockerUtil.extractArchive(container, FLModelPath);
await self.#storeModelInGridFS(self, extractedModelStream);
let extractedLogsStream = await dockerUtil.extractArchive(container, FLLogsPath);
await self.#storeLogsInGridFS(self, extractedLogsStream, extractedModelStream);
await self.#extractResults(self, container);
self.#log("Results extracted... Pushing", true);
//Push model and logs to CS
//Push model and global storage to CS
await self.#throwErrorWhenCancled(self);
await self.#pushLogsAndModelToCs(self)
await self.#pushGlobalStorageAndModelToCs(self)
self.#log("Pushing successful", true);
//Make decision for next round
......
......@@ -63,7 +63,12 @@ const FederatedSchema = new mongoose.Schema({
type: String,
required: false
},
lastLogGridId:
lastGlobalStorageGridId:
{
type: String,
required: false
},
lastLocalStorageGridId:
{
type: String,
required: false
......
......@@ -212,7 +212,8 @@ router.get('/delete/:id', ensureAuthenticated, async (req, res) => {
{
//Delete existing gridFS entries
await GridFsClient.removeFromGridFsIfDefiend(job.lastResultGridId);
await GridFsClient.removeFromGridFsIfDefiend(job.lastLogGridId);
await GridFsClient.removeFromGridFsIfDefiend(job.lastGlobalStorageGridId);
await GridFsClient.removeFromGridFsIfDefiend(job.lastLocalStorageGridId);
//Delete docker image when not used by anybody else
let otherJobs = await FLJob.find({ trainstoragelocation: job.trainstoragelocation, jobid: { $ne : job.jobid} })
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment