Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh: db schema change #79

Merged
merged 14 commits into from
Dec 13, 2024
1 change: 1 addition & 0 deletions appinfo/routes.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

/**
* Nextcloud - ContextChat
*
Expand Down
381 changes: 235 additions & 146 deletions composer.lock

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions lib/AppInfo/Application.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

/**
* Nextcloud - ContextChat
*
Expand All @@ -11,6 +12,8 @@

use OCA\ContextChat\Listener\AppDisableListener;
use OCA\ContextChat\Listener\FileListener;
use OCA\ContextChat\Listener\ShareListener;
use OCA\ContextChat\Listener\UserDeletedListener;
use OCA\ContextChat\Service\ProviderConfigService;
use OCA\ContextChat\TaskProcessing\ContextChatProvider;
use OCA\ContextChat\TaskProcessing\ContextChatTaskType;
Expand All @@ -27,11 +30,12 @@
use OCP\IConfig;
use OCP\Share\Events\ShareCreatedEvent;
use OCP\Share\Events\ShareDeletedEvent;
use OCP\User\Events\UserDeletedEvent;

class Application extends App implements IBootstrap {

public const APP_ID = 'context_chat';
public const MIN_APP_API_VERSION = '2.0.3';
public const MIN_APP_API_VERSION = '3.0.0';

public const CC_DEFAULT_REQUEST_TIMEOUT = 60 * 50; // 50 mins
// max size per file + max size of the batch of files to be embedded in a single request
Expand Down Expand Up @@ -71,12 +75,13 @@ public function __construct(array $urlParams = []) {
public function register(IRegistrationContext $context): void {
$context->registerEventListener(BeforeNodeDeletedEvent::class, FileListener::class);
$context->registerEventListener(NodeCreatedEvent::class, FileListener::class);
$context->registerEventListener(ShareCreatedEvent::class, FileListener::class);
$context->registerEventListener(ShareDeletedEvent::class, FileListener::class);
$context->registerEventListener(CacheEntryInsertedEvent::class, FileListener::class);
$context->registerEventListener(NodeRemovedFromCache::class, FileListener::class);
$context->registerEventListener(NodeWrittenEvent::class, FileListener::class);
$context->registerEventListener(AppDisableEvent::class, AppDisableListener::class);
$context->registerEventListener(UserDeletedEvent::class, UserDeletedListener::class);
Copy link
Member

@marcelklehr marcelklehr Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we also need Group member added/removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about adding it later with complete group based ACLs. We won't need to translate group -> users then, only the list of groups the user is part of, on the fly when querying.
In the backend it would be checked if a file can be accessed by the user or one of the groups the user is part of.
We can maintain a local list in the backend for user-group mappings but fetching it on the fly is not expensive, so we won't even need the listener if a user was added/removed from a group.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmh. I'd stick with user-based ACLs for this iteration, I think.

$context->registerEventListener(ShareCreatedEvent::class, ShareListener::class);
$context->registerEventListener(ShareDeletedEvent::class, ShareListener::class);
$context->registerTaskProcessingTaskType(ContextChatTaskType::class);
$context->registerTaskProcessingProvider(ContextChatProvider::class);

Expand Down
122 changes: 122 additions & 0 deletions lib/BackgroundJobs/ActionJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<?php

/**
* Nextcloud - ContextChat
*
* This file is licensed under the Affero General Public License version 3 or
* later. See the COPYING file.
*
* @author Anupam Kumar <[email protected]>
* @copyright Anupam Kumar 2024
*/

declare(strict_types=1);
namespace OCA\ContextChat\BackgroundJobs;

use OCA\ContextChat\Db\QueueActionMapper;
use OCA\ContextChat\Service\DiagnosticService;
use OCA\ContextChat\Service\LangRopeService;
use OCA\ContextChat\Type\ActionType;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\IJobList;
use OCP\BackgroundJob\QueuedJob;
use Psr\Log\LoggerInterface;

class ActionJob extends QueuedJob {
private const BATCH_SIZE = 100;

public function __construct(
ITimeFactory $timeFactory,
private LangRopeService $networkService,
private QueueActionMapper $actionMapper,
private IJobList $jobList,
private LoggerInterface $logger,
private DiagnosticService $diagnosticService,
) {
parent::__construct($timeFactory);
}

protected function run($argument): void {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());
$entities = $this->actionMapper->getFromQueue(static::BATCH_SIZE);

if (empty($entities)) {
return;
}

try {
foreach ($entities as $entity) {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());

switch ($entity->getType()) {
case ActionType::DELETE_SOURCE_IDS:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['sourceIds'])) {
$this->logger->warning('Invalid payload for DELETE_SOURCE_IDS action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->deleteSources($decoded['sourceIds']);
break;

case ActionType::DELETE_PROVIDER_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['providerId'])) {
$this->logger->warning('Invalid payload for DELETE_PROVIDER_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->deleteProvider($decoded['providerId']);
break;

case ActionType::DELETE_USER_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['userId'])) {
$this->logger->warning('Invalid payload for DELETE_USER_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->deleteUser($decoded['userId']);
break;

case ActionType::UPDATE_ACCESS_SOURCE_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['op']) || !isset($decoded['userIds']) || !isset($decoded['sourceId'])) {
$this->logger->warning('Invalid payload for UPDATE_ACCESS_SOURCE_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->updateAccess($decoded['op'], $decoded['userIds'], $decoded['sourceId']);
break;

case ActionType::UPDATE_ACCESS_PROVIDER_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['op']) || !isset($decoded['userIds']) || !isset($decoded['providerId'])) {
marcelklehr marked this conversation as resolved.
Show resolved Hide resolved
$this->logger->warning('Invalid payload for UPDATE_ACCESS_PROVIDER_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->updateAccessProvider($decoded['op'], $decoded['userIds'], $decoded['providerId']);
break;

case ActionType::UPDATE_ACCESS_DECL_SOURCE_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['userIds']) || !isset($decoded['sourceId'])) {
$this->logger->warning('Invalid payload for UPDATE_ACCESS_DECL_SOURCE_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->updateAccessDeclarative($decoded['userIds'], $decoded['sourceId']);
break;

default:
$this->logger->warning('Unknown action type', ['type' => $entity->getType()]);
}
}

foreach ($entities as $entity) {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());
$this->actionMapper->removeFromQueue($entity);
}
} catch (\Throwable $e) {
$this->jobList->add(static::class);
throw $e;
}

$this->jobList->add(static::class);
}
}
74 changes: 0 additions & 74 deletions lib/BackgroundJobs/DeleteJob.php

This file was deleted.

105 changes: 70 additions & 35 deletions lib/BackgroundJobs/IndexerJob.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

/*
* Copyright (c) 2022 The Recognize contributors.
* This file is licensed under the Affero General Public License version 3 or later. See the COPYING file.
Expand All @@ -7,6 +8,7 @@

namespace OCA\ContextChat\BackgroundJobs;

use OCA\ContextChat\AppInfo\Application;
use OCA\ContextChat\Db\QueueFile;
use OCA\ContextChat\Service\DiagnosticService;
use OCA\ContextChat\Service\LangRopeService;
Expand Down Expand Up @@ -34,15 +36,15 @@ class IndexerJob extends TimedJob {
public const DEFAULT_MAX_INDEXING_TIME = 30 * 60;

public function __construct(
ITimeFactory $time,
ITimeFactory $time,
private LoggerInterface $logger,
private QueueService $queue,
private QueueService $queue,
private IUserMountCache $userMountCache,
private IJobList $jobList,
private IJobList $jobList,
private LangRopeService $langRopeService,
private StorageService $storageService,
private IRootFolder $rootFolder,
private IAppConfig $appConfig,
private StorageService $storageService,
private IRootFolder $rootFolder,
private IAppConfig $appConfig,
private DiagnosticService $diagnosticService,
) {
parent::__construct($time);
Expand Down Expand Up @@ -130,49 +132,82 @@ protected function getMaxIndexingTime(): int {
protected function index(array $files): void {
$maxTime = $this->getMaxIndexingTime();
$startTime = time();
$sources = [];
$allSourceIds = [];
$loadedSources = [];
$retryQFiles = [];
$size = 0;

foreach ($files as $queueFile) {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());
if ($startTime + $maxTime < time()) {
break;
}

$file = current($this->rootFolder->getById($queueFile->getFileId()));
if (!$file instanceof File) {
continue;
}

$file_size = $file->getSize();
if ($size + $file_size > Application::CC_MAX_SIZE || count($sources) >= Application::CC_MAX_FILES) {
$loadedSources = array_merge($loadedSources, $this->langRopeService->indexSources($sources));
$sources = [];
$size = 0;
}

$userIds = $this->storageService->getUsersForFileId($queueFile->getFileId());
foreach ($userIds as $userId) {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());

try {
try {
try {
$fileHandle = $file->fopen('r');
} catch (LockedException|NotPermittedException $e) {
$this->logger->error('Could not open file ' . $file->getPath() . ' for reading', ['exception' => $e]);
continue;
}
if (!is_resource($fileHandle)) {
$this->logger->warning('File handle for' . $file->getPath() . ' is not readable');
continue;
}
$source = new Source(
$userId,
ProviderConfigService::getSourceId($file->getId()),
$file->getPath(),
$fileHandle,
$file->getMtime(),
$file->getMimeType(),
ProviderConfigService::getDefaultProviderKey(),
);
} catch (InvalidPathException|NotFoundException $e) {
$this->logger->error('Could not find file ' . $file->getPath(), ['exception' => $e]);
continue 2;
$fileHandle = $file->fopen('r');
} catch (NotPermittedException $e) {
$this->logger->error('Could not open file ' . $file->getPath() . ' for reading', ['exception' => $e]);
continue;
} catch (LockedException $e) {
$retryQFiles[] = $queueFile;
$this->logger->info('File ' . $file->getPath() . ' is locked, could not read for indexing. Adding it to the next batch.');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart!

continue;
}
$this->langRopeService->indexSources([$source]);
if (!is_resource($fileHandle)) {
$this->logger->warning('File handle for' . $file->getPath() . ' is not readable');
continue;
}

$sources[] = new Source(
$userIds,
ProviderConfigService::getSourceId($file->getId()),
substr($file->getInternalPath(), 6), // remove 'files/' prefix
$fileHandle,
$file->getMtime(),
$file->getMimeType(),
ProviderConfigService::getDefaultProviderKey(),
);
$allSourceIds[] = ProviderConfigService::getSourceId($file->getId());
} catch (InvalidPathException|NotFoundException $e) {
$this->logger->error('Could not find file ' . $file->getPath(), ['exception' => $e]);
continue;
}
try {
$this->queue->removeFromQueue($queueFile);
} catch (Exception $e) {
$this->logger->error('Could not remove file from queue', ['exception' => $e]);
}

if (count($sources) > 0) {
$loadedSources = array_merge($loadedSources, $this->langRopeService->indexSources($sources));
}

$emptyInvalidSources = array_diff($allSourceIds, $loadedSources);
if (count($emptyInvalidSources) > 0) {
$this->logger->info('Invalid or empty sources that were not indexed', ['sourceIds' => $emptyInvalidSources]);
}

try {
$this->queue->removeFromQueue($files);
// add files that were locked to the end of the queue
foreach ($retryQFiles as $queueFile) {
$this->queue->insertIntoQueue($queueFile);
}
} catch (Exception $e) {
$this->logger->error('Could not remove indexed files from queue', ['exception' => $e]);
}
}
}
Loading
Loading