Skip to content

Commit

Permalink
Merge pull request #9268 from 4Science/coar-notify-7-part-two
Browse files Browse the repository at this point in the history
Coar Notify Integration - Administer/Log
  • Loading branch information
tdonohue authored Mar 4, 2024
2 parents e90ab9e + 57a52f6 commit eee0bfd
Show file tree
Hide file tree
Showing 26 changed files with 2,181 additions and 14 deletions.
41 changes: 41 additions & 0 deletions dspace-api/src/main/java/org/dspace/app/ldn/LDNMessageEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.dspace.app.ldn;

import java.lang.reflect.Field;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
Expand Down Expand Up @@ -34,6 +35,12 @@ public class LDNMessageEntity implements ReloadableEntity<String> {
* LDN messages interact with a fictitious queue. Scheduled tasks manage the queue.
*/

/*
* Notification Type constants
*/
public static final String TYPE_INCOMING = "Incoming";
public static final String TYPE_OUTGOING = "Outgoing";

/**
* Message must not be processed.
*/
Expand Down Expand Up @@ -69,6 +76,11 @@ public class LDNMessageEntity implements ReloadableEntity<String> {
*/
public static final Integer QUEUE_STATUS_UNMAPPED_ACTION = 6;

/**
* Message queued for retry, it has to be elaborated.
*/
public static final Integer QUEUE_STATUS_QUEUED_FOR_RETRY = 7;

@Id
private String id;

Expand Down Expand Up @@ -275,4 +287,33 @@ public void setSourceIp(String sourceIp) {
public String toString() {
return "LDNMessage id:" + this.getID() + " typed:" + this.getType();
}

public static String getNotificationType(LDNMessageEntity ldnMessage) {
if (ldnMessage.getInReplyTo() != null || ldnMessage.getOrigin() != null) {
return TYPE_INCOMING;
}
return TYPE_OUTGOING;
}

public static String getServiceNameForNotifyServ(NotifyServiceEntity serviceEntity) {
if (serviceEntity != null) {
return serviceEntity.getName();
}
return "self";
}

public static String getQueueStatus(LDNMessageEntity ldnMessage) {
Class<LDNMessageEntity> cl = LDNMessageEntity.class;
try {
for (Field f : cl.getDeclaredFields()) {
String fieldName = f.getName();
if (fieldName.startsWith("QUEUE_") && (f.get(null) == ldnMessage.getQueueStatus())) {
return fieldName;
}
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new RuntimeException(e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@ public List<LDNMessageEntity> findAllMessagesByItem(
*/
public List<LDNMessageEntity> findAllRelatedMessagesByItem(
Context context, LDNMessageEntity msg, Item item, String... relatedTypes) throws SQLException;

/**
*
* @param context
* @return the list of messages in need to be reprocessed - with queue_status as QUEUE_STATUS_QUEUED_FOR_RETRY
* @throws SQLException
*/
public List<LDNMessageEntity> findMessagesToBeReprocessed(Context context) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,29 @@ public List<LDNMessageEntity> findOldestMessageToProcess(Context context, int ma
return result;
}

@Override
public List<LDNMessageEntity> findMessagesToBeReprocessed(Context context) throws SQLException {
// looking for LDN Messages to be reprocessed message
CriteriaBuilder criteriaBuilder = getCriteriaBuilder(context);
CriteriaQuery<LDNMessageEntity> criteriaQuery = getCriteriaQuery(criteriaBuilder, LDNMessageEntity.class);
Root<LDNMessageEntity> root = criteriaQuery.from(LDNMessageEntity.class);
criteriaQuery.select(root);
List<Predicate> andPredicates = new ArrayList<>(1);
andPredicates
.add(criteriaBuilder.equal(root.get(LDNMessageEntity_.queueStatus),
LDNMessageEntity.QUEUE_STATUS_QUEUED_FOR_RETRY));
criteriaQuery.where(criteriaBuilder.and(andPredicates.toArray(new Predicate[] {})));
List<Order> orderList = new LinkedList<>();
orderList.add(criteriaBuilder.desc(root.get(LDNMessageEntity_.queueAttempts)));
orderList.add(criteriaBuilder.asc(root.get(LDNMessageEntity_.queueLastStartTime)));
criteriaQuery.orderBy(orderList);
List<LDNMessageEntity> result = list(context, criteriaQuery, false, LDNMessageEntity.class, -1, -1);
if (result == null || result.isEmpty()) {
log.debug("No LDN messages found to be processed");
}
return result;
}

@Override
public List<LDNMessageEntity> findProcessingTimedoutMessages(Context context, int max_attempts)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,18 @@ public interface LDNMessageService {
public void delete(Context context, LDNMessageEntity ldnMessage) throws SQLException;

/**
* check if IP number is included in the configured ip-range on the Notify Service
* find the ldn messages to be reprocessed
*
* @param context the context
* @throws SQLException if something goes wrong
*/
public List<LDNMessageEntity> findMessagesToBeReprocessed(Context context) throws SQLException;

/**
* check if IP number is included in the configured ip-range on the Notify
* Service
*
* @param origin the Notify Service entity
* @param origin the Notify Service entity
* @param sourceIp the ip to evaluate
*/
public boolean isValidIp(NotifyServiceEntity origin, String sourceIp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonSyntaxException;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
Expand All @@ -38,7 +39,10 @@
import org.dspace.content.DSpaceObject;
import org.dspace.content.Item;
import org.dspace.content.service.ItemService;
import org.dspace.core.Constants;
import org.dspace.core.Context;
import org.dspace.discovery.indexobject.IndexableLDNNotification;
import org.dspace.event.Event;
import org.dspace.handle.service.HandleService;
import org.dspace.services.ConfigurationService;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -65,13 +69,20 @@ public class LDNMessageServiceImpl implements LDNMessageService {
private LDNRouter ldnRouter;

private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(LDNMessageServiceImpl.class);
private static final String LDN_ID_PREFIX = "urn:uuid:";

protected LDNMessageServiceImpl() {

}

@Override
public LDNMessageEntity find(Context context, String id) throws SQLException {

if (id == null) {
return null;
}

id = id.startsWith(LDN_ID_PREFIX) ? id : LDN_ID_PREFIX + id;
return ldnMessageDao.findByID(context, LDNMessageEntity.class, id);
}

Expand Down Expand Up @@ -171,11 +182,19 @@ private long ipToLong(InetAddress ip) {

@Override
public void update(Context context, LDNMessageEntity ldnMessage) throws SQLException {
// move the queue_status from UNTRUSTED to QUEUED if origin is a known NotifyService
if (ldnMessage.getOrigin() != null &&
LDNMessageEntity.QUEUE_STATUS_UNTRUSTED.compareTo(ldnMessage.getQueueStatus()) == 0) {
ldnMessage.setQueueStatus(LDNMessageEntity.QUEUE_STATUS_QUEUED);
}
ldnMessageDao.save(context, ldnMessage);
UUID notificationUUID = UUID.fromString(ldnMessage.getID().replace(LDN_ID_PREFIX, ""));
ArrayList<String> identifiersList = new ArrayList<String>();
identifiersList.add(ldnMessage.getID());
context.addEvent(
new Event(Event.MODIFY, Constants.LDN_MESSAGE,
notificationUUID,
IndexableLDNNotification.TYPE, identifiersList));
}

private DSpaceObject findDspaceObjectByUrl(Context context, String url) throws SQLException {
Expand Down Expand Up @@ -210,6 +229,13 @@ public List<LDNMessageEntity> findOldestMessagesToProcess(Context context) throw
return result;
}

@Override
public List<LDNMessageEntity> findMessagesToBeReprocessed(Context context) throws SQLException {
List<LDNMessageEntity> result = null;
result = ldnMessageDao.findMessagesToBeReprocessed(context);
return result;
}

@Override
public List<LDNMessageEntity> findProcessingTimedoutMessages(Context context) throws SQLException {
List<LDNMessageEntity> result = null;
Expand All @@ -225,9 +251,10 @@ public int extractAndProcessMessageFromQueue(Context context) throws SQLExceptio
if (timeoutInMinutes == 0) {
timeoutInMinutes = 60;
}
List<LDNMessageEntity> msgs = null;
List<LDNMessageEntity> msgs = new ArrayList<LDNMessageEntity>();
try {
msgs = findOldestMessagesToProcess(context);
msgs.addAll(findOldestMessagesToProcess(context));
msgs.addAll(findMessagesToBeReprocessed(context));
if (msgs != null && msgs.size() > 0) {
LDNMessageEntity msg = null;
LDNProcessor processor = null;
Expand All @@ -253,9 +280,14 @@ public int extractAndProcessMessageFromQueue(Context context) throws SQLExceptio
processor.process(context, notification);
msg.setQueueStatus(LDNMessageEntity.QUEUE_STATUS_PROCESSED);
result = 1;
} catch (JsonSyntaxException jse) {
result = -1;
log.error("Unable to read JSON notification from LdnMessage " + msg, jse);
msg.setQueueStatus(LDNMessageEntity.QUEUE_STATUS_FAILED);
} catch (Exception e) {
result = -1;
log.error(e);
msg.setQueueStatus(LDNMessageEntity.QUEUE_STATUS_FAILED);
} finally {
msg.setQueueAttempts(msg.getQueueAttempts() + 1);
update(context, msg);
Expand Down
7 changes: 6 additions & 1 deletion dspace-api/src/main/java/org/dspace/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,16 @@ public class Constants {
*/
public static final int EPERSON = 7;

/**
* Type of LDN MESSAGE objects
*/
public static final int LDN_MESSAGE = 8;

/**
* lets you look up type names from the type IDs
*/
public static final String[] typeText = { "BITSTREAM", "BUNDLE", "ITEM", "COLLECTION", "COMMUNITY", "SITE", "GROUP",
"EPERSON"};
"EPERSON", "LDN_MESSAGE"};

/**
* Special Bundle and Bitstream Names:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,22 @@ public void consume(Context ctx, Event event) throws Exception {

int st = event.getSubjectType();
if (!(st == Constants.ITEM || st == Constants.BUNDLE
|| st == Constants.COLLECTION || st == Constants.COMMUNITY || st == Constants.SITE)) {
|| st == Constants.COLLECTION || st == Constants.COMMUNITY || st == Constants.SITE
|| st == Constants.LDN_MESSAGE)) {
log
.warn("IndexConsumer should not have been given this kind of Subject in an event, skipping: "
+ event.toString());
return;
}

DSpaceObject subject = event.getSubject(ctx);

DSpaceObject object = event.getObject(ctx);

DSpaceObject subject = null;
DSpaceObject object = null;
try {
subject = event.getSubject(ctx);
object = event.getObject(ctx);
} catch (Exception e) {
log.warn("Could not find the related DSpace Object for event subject: " + st);
}

// If event subject is a Bundle and event was Add or Remove,
// transform the event to be a Modify on the owning Item.
Expand All @@ -110,7 +115,7 @@ public void consume(Context ctx, Event event) throws Exception {
case Event.MODIFY:
case Event.MODIFY_METADATA:
if (subject == null) {
if (st == Constants.SITE) {
if (st == Constants.SITE || st == Constants.LDN_MESSAGE) {
// Update the indexable objects of type in event.detail of objects with ids in event.identifiers
for (String id : event.getIdentifiers()) {
IndexFactory indexableObjectService = IndexObjectFactoryFactory.getInstance().
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://www.dspace.org/license/
*/
package org.dspace.discovery.indexobject;

import org.dspace.app.ldn.LDNMessageEntity;
import org.dspace.discovery.IndexableObject;

/**
* {@link LDNMessageEntity} implementation for the {@link IndexableObject}
*
* @author Stefano Maffei at 4science.com
*/
public class IndexableLDNNotification extends AbstractIndexableObject<LDNMessageEntity, String> {

private LDNMessageEntity ldnMessage;
public static final String TYPE = LDNMessageEntity.class.getSimpleName();

public IndexableLDNNotification(LDNMessageEntity ldnMessage) {
super();
this.ldnMessage = ldnMessage;
}

@Override
public String getType() {
return getTypeText();
}

@Override
public String getID() {
return ldnMessage.getID();
}

@Override
public LDNMessageEntity getIndexedObject() {
return ldnMessage;
}

@Override
public void setIndexedObject(LDNMessageEntity object) {
this.ldnMessage = object;
}

@Override
public String getTypeText() {
return TYPE;
}

}
Loading

0 comments on commit eee0bfd

Please sign in to comment.